ashb commented on code in PR #54943: URL: https://github.com/apache/airflow/pull/54943#discussion_r2487405318
########## shared/configuration/src/airflow_shared/configuration/parser.py: ########## @@ -0,0 +1,2324 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Shared configuration parser for Airflow distributions.""" + +from __future__ import annotations + +import contextlib +import datetime +import functools +import itertools +import json +import logging +import multiprocessing +import os +import pathlib +import re +import shlex +import stat +import subprocess +import sys +import warnings +from base64 import b64encode +from collections.abc import Generator, Iterable +from configparser import ConfigParser, NoOptionError, NoSectionError +from contextlib import contextmanager +from copy import deepcopy +from io import StringIO +from json.decoder import JSONDecodeError +from re import Pattern +from typing import IO, Any +from urllib.parse import urlsplit + +from packaging.version import parse as parse_version +from typing_extensions import overload + +from .exceptions import AirflowConfigException + +log = logging.getLogger(__name__) + +# show Airflow's deprecation warnings +if not sys.warnoptions: + warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow") + warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow") + +_SQLITE3_VERSION_PATTERN = re.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$") + +ConfigType = str | int | float | bool +ConfigOptionsDictType = dict[str, ConfigType] +ConfigSectionSourcesType = dict[str, str | tuple[str, str]] +ConfigSourcesType = dict[str, ConfigSectionSourcesType] + +ENV_VAR_PREFIX = "AIRFLOW__" + +# Have to duplicate to avoid imports from core +DEFAULT_SECRETS_SEARCH_PATH = [ + "airflow.secrets.environment_variables.EnvironmentVariablesBackend", + "airflow.secrets.metastore.MetastoreBackend", +] + + +class ConfigModifications: + """ + Holds modifications to be applied when writing out the config. + + :param rename: Mapping from (old_section, old_option) to (new_section, new_option) + :param remove: Set of (section, option) to remove + :param default_updates: Mapping from (section, option) to new default value + """ + + def __init__(self) -> None: + self.rename: dict[tuple[str, str], tuple[str, str]] = {} + self.remove: set[tuple[str, str]] = set() + self.default_updates: dict[tuple[str, str], str] = {} + + def add_rename(self, old_section: str, old_option: str, new_section: str, new_option: str) -> None: + self.rename[(old_section, old_option)] = (new_section, new_option) + + def add_remove(self, section: str, option: str) -> None: + self.remove.add((section, option)) + + def add_default_update(self, section: str, option: str, new_default: str) -> None: + self.default_updates[(section, option)] = new_default + + +def _parse_sqlite_version(s: str) -> tuple[int, ...]: + match = _SQLITE3_VERSION_PATTERN.match(s) + if match is None: + return () + return tuple(int(p) for p in match.group("version").split(".")) + + +@overload +def expand_env_var(env_var: None) -> None: ... + + +@overload +def expand_env_var(env_var: str) -> str: ... + + +def expand_env_var(env_var: str | None) -> str | None: + """ + Expand (potentially nested) env vars. + + Repeat and apply `expandvars` and `expanduser` until + interpolation stops having any effect. + """ + if not env_var or not isinstance(env_var, str): + return env_var + while True: + interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) + if interpolated == env_var: + return interpolated + env_var = interpolated + + +def run_command(command: str) -> str: + """Run command and returns stdout.""" + process = subprocess.Popen( + shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True + ) + output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) + + if process.returncode != 0: + # Import exception dynamically to avoid circular imports + + raise AirflowConfigException( + f"Cannot execute {command}. Error code is: {process.returncode}. " + f"Output: {output}, Stderr: {stderr}" + ) + + return output + + +def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool: + """ + Check if the config is a template. + + :param configuration_description: description of configuration + :param section: section + :param key: key + :return: True if the config is a template + """ + return configuration_description.get(section, {}).get(key, {}).get("is_template", False) + + +def get_all_expansion_variables() -> dict[str, Any]: + """Get all variables available for configuration expansion.""" + # Use pathlib for cleaner path handling + current_file = pathlib.Path(__file__) + repo_root = current_file.parent.parent.parent.parent.parent.parent + + # Set up dags folder for unit tests + _TEST_DAGS_FOLDER = repo_root / "airflow-core" / "tests" / "unit" / "dags" + if _TEST_DAGS_FOLDER.exists(): + TEST_DAGS_FOLDER = str(_TEST_DAGS_FOLDER) + else: + TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags") + + # Set up plugins folder for unit tests + _TEST_PLUGINS_FOLDER = repo_root / "airflow-core" / "tests" / "unit" / "plugins" + if _TEST_PLUGINS_FOLDER.exists(): + TEST_PLUGINS_FOLDER = str(_TEST_PLUGINS_FOLDER) + else: + TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins") + + return { + "AIRFLOW_HOME": AIRFLOW_HOME, + "AIRFLOW_CONFIG": AIRFLOW_CONFIG, + "FERNET_KEY": FERNET_KEY, + "JWT_SECRET_KEY": JWT_SECRET_KEY, + "SECRET_KEY": SECRET_KEY, + "TEST_DAGS_FOLDER": TEST_DAGS_FOLDER, + "TEST_PLUGINS_FOLDER": TEST_PLUGINS_FOLDER, + } + + +def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser: + """ + Create default config parser based on configuration description. + + It creates ConfigParser with all default values retrieved from the configuration description and + expands all the variables from the global and local variables defined in this module. + + :param configuration_description: configuration description - retrieved from config.yaml files + following the schema defined in "config.yml.schema.json" in the config_templates folder. + :return: Default Config Parser that can be used to read configuration values from. + """ + parser = ConfigParser() + all_vars = get_all_expansion_variables() + for section, section_desc in configuration_description.items(): + parser.add_section(section) + options = section_desc["options"] + for key in options: + default_value = options[key]["default"] + is_template = options[key].get("is_template", False) + if default_value is not None: + if is_template or not isinstance(default_value, str): + parser.set(section, key, default_value) + else: + parser.set(section, key, default_value.format(**all_vars)) + return parser + + +def _import_string(dotted_path: str): + """ + Import a dotted module path and return the attribute/class designated by the last name in the path. + + Raise ImportError if the import failed. + """ + # TODO: Add support for nested classes. Currently, it only works for top-level classes. + from importlib import import_module + + try: + module_path, class_name = dotted_path.rsplit(".", 1) + except ValueError: + raise ImportError(f"{dotted_path} doesn't look like a module path") + + module = import_module(module_path) + + try: + return getattr(module, class_name) + except AttributeError: + raise ImportError(f'Module "{module_path}" does not define a "{class_name}" attribute/class') + + +class AirflowConfigParser(ConfigParser): + """ + Custom Airflow Configparser supporting defaults and deprecated options. + + This is a subclass of ConfigParser that supports defaults and deprecated options. + + The defaults are stored in the ``_default_values``. The configuration description keeps + description of all the options available in Airflow (description follow config.yaml.schema). + + :param default_config: default configuration (in the form of ini file). + :param configuration_description: description of configuration to use + """ + + def __init__( + self, + config_templates_dir: str = "", + default_config: str | None = None, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + if config_templates_dir: + self.config_templates_dir = config_templates_dir + else: + self.config_templates_dir = find_config_templates_dir() + self.configuration_description = retrieve_configuration_description(include_providers=False) + self.upgraded_values = {} + # For those who would like to use a different data structure to keep defaults: + # We have to keep the default values in a ConfigParser rather than in any other + # data structure, because the values we have might contain %% which are ConfigParser + # interpolation placeholders. The _default_values config parser will interpolate them + # properly when we call get() on it. + self._default_values = create_default_config_parser(self.configuration_description) + self._provider_config_fallback_default_values = self.create_provider_config_fallback_defaults() + if default_config is not None: + self._update_defaults_from_string(default_config) + self._update_logging_deprecated_template_to_one_from_defaults() + self.is_validated = False + self._suppress_future_warnings = False + self._providers_configuration_loaded = False + + def _get_config_file_path(self, file_name: str) -> str: + """ + Get path to config file template. + + :param file_name: Name of the config file + :return: Full path to the config file + """ + return os.path.join(self.config_templates_dir, file_name) + + def create_provider_config_fallback_defaults(self) -> ConfigParser: + """ + Create fallback defaults. + + This parser contains provider defaults for Airflow configuration, containing fallback default values + that might be needed when provider classes are being imported - before provider's configuration + is loaded. + + Unfortunately airflow currently performs a lot of stuff during importing and some of that might lead + to retrieving provider configuration before the defaults for the provider are loaded. + + Those are only defaults, so if you have "real" values configured in your configuration (.cfg file or + environment variables) those will be used as usual. + + NOTE!! Do NOT attempt to remove those default fallbacks thinking that they are unnecessary duplication, + at least not until we fix the way how airflow imports "do stuff". This is unlikely to succeed. + + You've been warned! + """ + config_parser = ConfigParser() + try: + config_parser.read(self._get_config_file_path("provider_config_fallback_defaults.cfg")) + except FileNotFoundError: + # If file doesn't exist, return empty parser + pass + return config_parser + + def _update_logging_deprecated_template_to_one_from_defaults(self): + default = self.get_default_value("logging", "log_filename_template") + if default: + # Tuple does not support item assignment, so we have to create a new tuple and replace it + original_replacement = self.deprecated_values["logging"]["log_filename_template"] + self.deprecated_values["logging"]["log_filename_template"] = ( + original_replacement[0], + default, + ) + + def is_template(self, section: str, key) -> bool: + """ + Return whether the value is templated. + + :param section: section of the config + :param key: key in the section + :return: True if the value is templated + """ + if self.configuration_description is None: + return False + return _is_template(self.configuration_description, section, key) + + def _update_defaults_from_string(self, config_string: str): + """ + Update the defaults in _default_values based on values in config_string ("ini" format). + + Note that those values are not validated and cannot contain variables because we are using + regular config parser to load them. This method is used to test the config parser in unit tests. + + :param config_string: ini-formatted config string + """ + parser = ConfigParser() + parser.read_string(config_string) + for section in parser.sections(): + if section not in self._default_values.sections(): + self._default_values.add_section(section) + errors = False + for key, value in parser.items(section): + if not self.is_template(section, key) and "{" in value: + errors = True + log.error( + "The %s.%s value %s read from string contains variable. This is not supported", + section, + key, + value, + ) + self._default_values.set(section, key, value) + if errors: + raise AirflowConfigException( + f"The string config passed as default contains variables. " + f"This is not supported. String config: {config_string}" + ) + + def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any: + """ + Retrieve default value from default config parser. + + This will retrieve the default value from the default config parser. Optionally a raw, stored + value can be retrieved by setting skip_interpolation to True. This is useful for example when + we want to write the default value to a file, and we don't want the interpolation to happen + as it is going to be done later when the config is read. + + :param section: section of the config + :param key: key to use + :param fallback: fallback value to use + :param raw: if raw, then interpolation will be reversed + :param kwargs: other args + :return: + """ + value = self._default_values.get(section, key, fallback=fallback, **kwargs) + if raw and value is not None: + return value.replace("%", "%%") + return value + + def get_provider_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: + """Get provider config fallback default values.""" + return self._provider_config_fallback_default_values.get(section, key, fallback=None, **kwargs) + + # These configuration elements can be fetched as the stdout of commands + # following the "{section}__{name}_cmd" pattern, the idea behind this + # is to not store password on boxes in text files. + # These configs can also be fetched from Secrets backend + # following the "{section}__{name}__secret" pattern + @functools.cached_property + def sensitive_config_values(self) -> set[tuple[str, str]]: + if self.configuration_description is None: + return set() + flattened = { + (s, k): item + for s, s_c in self.configuration_description.items() + for k, item in s_c.get("options").items() # type: ignore[union-attr] + } + sensitive = { + (section.lower(), key.lower()) + for (section, key), v in flattened.items() + if v.get("sensitive") is True + } + depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options} + depr_section = { + (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections + } + sensitive.update(depr_section, depr_option) + return sensitive + + # A mapping of (new section, new option) -> (old section, old option, since_version). + # When reading new option, the old option will be checked to see if it exists. If it does a + # DeprecationWarning will be issued and the old option will be used instead + deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = { + ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"), + ("api", "host"): ("webserver", "web_server_host", "3.0"), + ("api", "port"): ("webserver", "web_server_port", "3.0"), + ("api", "workers"): ("webserver", "workers", "3.0"), + ("api", "worker_timeout"): ("webserver", "web_server_worker_timeout", "3.0"), + ("api", "ssl_cert"): ("webserver", "web_server_ssl_cert", "3.0"), + ("api", "ssl_key"): ("webserver", "web_server_ssl_key", "3.0"), + ("api", "access_logfile"): ("webserver", "access_logfile", "3.0"), + ("triggerer", "capacity"): ("triggerer", "default_capacity", "3.0"), + ("api", "expose_config"): ("webserver", "expose_config", "3.0.1"), + ("fab", "access_denied_message"): ("webserver", "access_denied_message", "3.0.2"), + ("fab", "expose_hostname"): ("webserver", "expose_hostname", "3.0.2"), + ("fab", "navbar_color"): ("webserver", "navbar_color", "3.0.2"), + ("fab", "navbar_text_color"): ("webserver", "navbar_text_color", "3.0.2"), + ("fab", "navbar_hover_color"): ("webserver", "navbar_hover_color", "3.0.2"), + ("fab", "navbar_text_hover_color"): ("webserver", "navbar_text_hover_color", "3.0.2"), + ("api", "secret_key"): ("webserver", "secret_key", "3.0.2"), + ("api", "enable_swagger_ui"): ("webserver", "enable_swagger_ui", "3.0.2"), + ("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.0.4"), + ("api", "grid_view_sorting_order"): ("webserver", "grid_view_sorting_order", "3.1.0"), + ("api", "log_fetch_timeout_sec"): ("webserver", "log_fetch_timeout_sec", "3.1.0"), + ("api", "hide_paused_dags_by_default"): ("webserver", "hide_paused_dags_by_default", "3.1.0"), + ("api", "page_size"): ("webserver", "page_size", "3.1.0"), + ("api", "default_wrap"): ("webserver", "default_wrap", "3.1.0"), + ("api", "auto_refresh_interval"): ("webserver", "auto_refresh_interval", "3.1.0"), + ("api", "require_confirmation_dag_change"): ("webserver", "require_confirmation_dag_change", "3.1.0"), + ("api", "instance_name"): ("webserver", "instance_name", "3.1.0"), + ("api", "log_config"): ("api", "access_logfile", "3.1.0"), Review Comment: Nothing concrete, just felt a bit odd to have all these settings related to airflow-core in shared mostly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
