jedcunningham commented on code in PR #32604:
URL: https://github.com/apache/airflow/pull/32604#discussion_r1265958755


##########
airflow/configuration.py:
##########
@@ -136,30 +139,121 @@ def _default_config_file_path(file_name: str) -> str:
     return os.path.join(templates_dir, file_name)
 
 
-def default_config_yaml() -> dict[str, Any]:
+def default_configuration_description(
+    include_airflow: bool = True,
+    include_providers: bool = True,
+    selected_provider: str | None = None,
+    config_file_name: str = "config.yml",
+) -> dict[str, dict[str, Any]]:
     """
-    Read Airflow configs from YAML file.
+    Read Airflow configs from YAML file, and merge them with provider configs 
optionally.
 
+    :param include_airflow: Include Airflow configs
+    :param include_providers: Include provider configs
+    :param selected_provider: If specified, include selected provider only
+    :param config_file_name: name of the file in "config_templates" directory 
to read default config from
     :return: Python dictionary containing configs & their info
     """
-    with open(_default_config_file_path("config.yml")) as config_file:
-        return yaml.safe_load(config_file)
+    base_configuration_description: dict[str, dict[str, Any]] = {}
+    if include_airflow:
+        with open(_default_config_file_path(config_file_name)) as config_file:
+            base_configuration_description.update(yaml.safe_load(config_file))
+    if include_providers:
+        from airflow.providers_manager import ProvidersManager
+
+        for provider, config in ProvidersManager().provider_configs:
+            if selected_provider and provider != selected_provider:
+                continue
+            base_configuration_description.update(config)
+    return base_configuration_description
+
+
+def dejinjafy(value: Any) -> str:
+    if isinstance(value, str):
+        if "{{{{" in value:
+            return value.replace("{{{{", "{{").replace("}}}}", "}}")
+        return value.replace("{{", "{").replace("}}", "}")
+    return value
 
 
 class AirflowConfigParser(ConfigParser):
     """Custom Airflow Configparser supporting defaults and deprecated 
options."""
 
+    def __init__(
+        self,
+        default_config: str | dict[str, dict[str, Any]] | None = None,
+        default_config_description: dict[str, dict[str, Any]] | None = None,
+        *args,
+        **kwargs,
+    ):
+        super().__init__(*args, **kwargs)
+        self.default_config_description = default_config_description
+        self.upgraded_values = {}
+        self._default_values: dict[str, dict[str, Any]] = {}
+        if default_config is not None:

Review Comment:
   Might be worth inverting this conditional, the else is pretty far away.



##########
airflow/configuration.py:
##########
@@ -327,32 +421,100 @@ def inversed_deprecated_sections(self):
     def optionxform(self, optionstr: str) -> str:
         return optionstr
 
-    def __init__(self, default_config: str | None = None, *args, **kwargs):
-        super().__init__(*args, **kwargs)
-        self.upgraded_values = {}
-
-        self.airflow_defaults = ConfigParser(*args, **kwargs)
-        if default_config is not None:
-            self.airflow_defaults.read_string(default_config)
-            # Set the upgrade value based on the current loaded default
-            default = self.airflow_defaults.get("logging", 
"log_filename_template", fallback=None)
-            if default:
-                replacement = 
self.deprecated_values["logging"]["log_filename_template"]
-                self.deprecated_values["logging"]["log_filename_template"] = (
-                    replacement[0],
-                    default,
-                    replacement[2],
-                )
-            else:
-                # In case of tests it might not exist
-                with suppress(KeyError):
-                    del 
self.deprecated_values["logging"]["log_filename_template"]
-        else:
-            with suppress(KeyError):
-                del self.deprecated_values["logging"]["log_filename_template"]
-
-        self.is_validated = False
-        self._suppress_future_warnings = False
+    def write(  # type: ignore[override]
+        self,
+        file: IO[str],
+        section: str | None = None,
+        include_examples: bool = True,
+        include_descriptions: bool = True,
+        include_sources: bool = True,
+        include_variables: bool = True,
+        hide_sensitive_values: bool = False,
+        only_defaults: bool = False,
+        **kwargs: Any,
+    ) -> None:
+        """
+        Write  configuration with comments and examples to a file.

Review Comment:
   ```suggestion
           Write configuration with comments and examples to a file.
   ```
   
   super nit :)



##########
airflow/configuration.py:
##########
@@ -136,30 +139,121 @@ def _default_config_file_path(file_name: str) -> str:
     return os.path.join(templates_dir, file_name)
 
 
-def default_config_yaml() -> dict[str, Any]:
+def default_configuration_description(
+    include_airflow: bool = True,
+    include_providers: bool = True,
+    selected_provider: str | None = None,
+    config_file_name: str = "config.yml",
+) -> dict[str, dict[str, Any]]:
     """
-    Read Airflow configs from YAML file.
+    Read Airflow configs from YAML file, and merge them with provider configs 
optionally.
 
+    :param include_airflow: Include Airflow configs
+    :param include_providers: Include provider configs
+    :param selected_provider: If specified, include selected provider only
+    :param config_file_name: name of the file in "config_templates" directory 
to read default config from
     :return: Python dictionary containing configs & their info
     """
-    with open(_default_config_file_path("config.yml")) as config_file:
-        return yaml.safe_load(config_file)
+    base_configuration_description: dict[str, dict[str, Any]] = {}
+    if include_airflow:
+        with open(_default_config_file_path(config_file_name)) as config_file:
+            base_configuration_description.update(yaml.safe_load(config_file))
+    if include_providers:
+        from airflow.providers_manager import ProvidersManager
+
+        for provider, config in ProvidersManager().provider_configs:
+            if selected_provider and provider != selected_provider:
+                continue
+            base_configuration_description.update(config)
+    return base_configuration_description
+
+
+def dejinjafy(value: Any) -> str:
+    if isinstance(value, str):
+        if "{{{{" in value:
+            return value.replace("{{{{", "{{").replace("}}}}", "}}")
+        return value.replace("{{", "{").replace("}}", "}")
+    return value

Review Comment:
   We are just concerned with up to 4 curlies right? This is simpler:
   
   ```suggestion
       if not isinstance(value, str):
           return value
       return value.replace("{{", "{").replace("}}", "}")
   ```
   
   This does the 4->2 and 2->1.



##########
airflow/providers/celery/executors/celery_kubernetes_executor.py:
##########
@@ -55,14 +55,21 @@ class CeleryKubernetesExecutor(LoggingMixin):
 
     callback_sink: BaseCallbackSink | None = None
 
-    KUBERNETES_QUEUE = conf.get("celery_kubernetes_executor", 
"kubernetes_queue")
+    @property

Review Comment:
   ```suggestion
       @cached_property
   ```
    
    This should be safe to do. (It does need to be imported though)



##########
airflow/configuration.py:
##########
@@ -1474,84 +1679,123 @@ def _generate_fernet_key() -> str:
     return Fernet.generate_key().decode()
 
 
-def initialize_config() -> AirflowConfigParser:
+def _create_values_from_config_description(
+    default_config_description: dict[str, dict[str, Any]]
+) -> dict[str, dict[str, Any]]:
     """
-    Load the Airflow config files.
+    Creates dictionary of default values from configuration description, using
+    based on variables defined in this module (expansion variables).
 
-    Called for you automatically as part of the Airflow boot process.
+    :param default_config_description: description of configuration to read 
including defaults
+    :return: dictionary of default values based on description and variables 
defined in this module
     """
-    global FERNET_KEY, AIRFLOW_HOME, WEBSERVER_CONFIG
-
-    default_config = _parameterized_config_from_template("default_airflow.cfg")
+    all_vars = get_all_expansion_variables()
+    default_values: dict[str, dict[str, Any]] = {}
+    for section, section_desc in default_config_description.items():
+        section_dict: dict[str, Any] = {}
+        default_values[section] = section_dict
+        options = section_desc["options"]
+        for key in options:
+            default_value = options[key]["default"]
+            if default_value is not None:
+                section_dict[key] = (
+                    default_value.format(**all_vars) if 
isinstance(default_value, str) else default_value
+                )
+    return default_values
 
-    local_conf = AirflowConfigParser(default_config=default_config)
 
-    if local_conf.getboolean("core", "unit_test_mode"):
-        # Load test config only
-        if not os.path.isfile(TEST_CONFIG_FILE):
-            from cryptography.fernet import Fernet
+def create_airflow_config_parser(default_config_description: dict[str, 
dict[str, Any]]):
+    """
+    Creates Airflow config parser.
 
-            log.info("Creating new Airflow config file for unit tests in: %s", 
TEST_CONFIG_FILE)
-            pathlib.Path(AIRFLOW_HOME).mkdir(parents=True, exist_ok=True)
+    Creates Airflow config parser based on variables defined in this module 
(expansion variables)
+    and default configuration description passed as parameter which serves as 
source of default values.
 
-            FERNET_KEY = Fernet.generate_key().decode()
+    :param default_config_description: default configuration description - 
retrieved from config*.yaml files
+        following the schema defined in "config.yml.schema.json" in the 
config_templates folder.
+    :return: Airflow config parser that can be used to read configuration 
values from.
+    """
+    return AirflowConfigParser(
+        
default_config=_create_values_from_config_description(default_config_description),
+        default_config_description=default_config_description,
+    )
 
-            with open(TEST_CONFIG_FILE, "w") as file:
-                cfg = _parameterized_config_from_template("default_test.cfg")
-                file.write(cfg)
-            make_group_other_inaccessible(TEST_CONFIG_FILE)
 
-        local_conf.load_test_config()
-    else:
-        # Load normal config
-        if not os.path.isfile(AIRFLOW_CONFIG):
-            from cryptography.fernet import Fernet
+def load_standard_airflow_configuration(airflow_config_parser: 
AirflowConfigParser):
+    """
+    Loads standard airflow configuration.
 
-            log.info("Creating new Airflow config file in: %s", AIRFLOW_CONFIG)
-            pathlib.Path(AIRFLOW_HOME).mkdir(parents=True, exist_ok=True)
+    In case it finds that the configuration file is missing, it will create it 
and write the default
+    configuration values there, based on defaults passed, and will add the 
comments and examples
+    from the default configuration description passed.
 
-            FERNET_KEY = Fernet.generate_key().decode()
+    :param airflow_config_parser: parser to which the configuration will be 
loaded
 
+    """
+    global FERNET_KEY, AIRFLOW_HOME
+    if not os.path.isfile(AIRFLOW_CONFIG):
+        from cryptography.fernet import Fernet
+
+        log.info("Creating new Airflow config file in: %s", AIRFLOW_CONFIG)
+        pathlib.Path(AIRFLOW_HOME).mkdir(parents=True, exist_ok=True)
+        FERNET_KEY = Fernet.generate_key().decode()
+        if airflow_config_parser._default_values and 
airflow_config_parser.default_config_description:
             with open(AIRFLOW_CONFIG, "w") as file:
-                file.write(default_config)
-            make_group_other_inaccessible(AIRFLOW_CONFIG)
-
-        log.info("Reading the config from %s", AIRFLOW_CONFIG)
+                airflow_config_parser.write(file)
+        make_group_other_inaccessible(AIRFLOW_CONFIG)
+    log.info("Reading the config from %s", AIRFLOW_CONFIG)
+    airflow_config_parser.read(AIRFLOW_CONFIG)
+    if airflow_config_parser.has_option("core", "AIRFLOW_HOME"):
+        msg = (
+            "Specifying both AIRFLOW_HOME environment variable and 
airflow_home "
+            "in the config file is deprecated. Please use only the 
AIRFLOW_HOME "
+            "environment variable and remove the config file entry."
+        )
+        if "AIRFLOW_HOME" in os.environ:
+            warnings.warn(msg, category=DeprecationWarning)
+        elif airflow_config_parser.get("core", "airflow_home") == AIRFLOW_HOME:
+            warnings.warn(
+                "Specifying airflow_home in the config file is deprecated. As 
you "
+                "have left it at the default value you should remove the 
setting "
+                "from your airflow.cfg and suffer no change in behaviour.",
+                category=DeprecationWarning,
+            )
+        else:
+            # there
+            AIRFLOW_HOME = airflow_config_parser.get("core", "airflow_home")  
# type: ignore[assignment]
+            warnings.warn(msg, category=DeprecationWarning)
 
-        local_conf.read(AIRFLOW_CONFIG)
 
-        if local_conf.has_option("core", "AIRFLOW_HOME"):
-            msg = (
-                "Specifying both AIRFLOW_HOME environment variable and 
airflow_home "
-                "in the config file is deprecated. Please use only the 
AIRFLOW_HOME "
-                "environment variable and remove the config file entry."
-            )
-            if "AIRFLOW_HOME" in os.environ:
-                warnings.warn(msg, category=DeprecationWarning)
-            elif local_conf.get("core", "airflow_home") == AIRFLOW_HOME:
-                warnings.warn(
-                    "Specifying airflow_home in the config file is deprecated. 
As you "
-                    "have left it at the default value you should remove the 
setting "
-                    "from your airflow.cfg and suffer no change in behaviour.",
-                    category=DeprecationWarning,
-                )
-            else:
-                # there
-                AIRFLOW_HOME = local_conf.get("core", "airflow_home")  # type: 
ignore[assignment]
-                warnings.warn(msg, category=DeprecationWarning)
+def initialize_config() -> AirflowConfigParser:
+    """
+    Load the Airflow config files.
 
-        # They _might_ have set unit_test_mode in the airflow.cfg, we still
-        # want to respect that and then load the unittests.cfg
-        if local_conf.getboolean("core", "unit_test_mode"):
-            local_conf.load_test_config()
+    Called for you automatically as part of the Airflow boot process.
+    """
+    global FERNET_KEY, AIRFLOW_HOME, WEBSERVER_CONFIG

Review Comment:
   ```suggestion
       global WEBSERVER_CONFIG
   ```
   
   Don't we just need this one? We aren't doing anything with the other 2?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to