amoghrajesh commented on code in PR #54943: URL: https://github.com/apache/airflow/pull/54943#discussion_r2476964654
########## shared/configuration/src/airflow_shared/configuration/parser.py: ########## @@ -0,0 +1,2324 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Shared configuration parser for Airflow distributions.""" + +from __future__ import annotations + +import contextlib +import datetime +import functools +import itertools +import json +import logging +import multiprocessing +import os +import pathlib +import re +import shlex +import stat +import subprocess +import sys +import warnings +from base64 import b64encode +from collections.abc import Generator, Iterable +from configparser import ConfigParser, NoOptionError, NoSectionError +from contextlib import contextmanager +from copy import deepcopy +from io import StringIO +from json.decoder import JSONDecodeError +from re import Pattern +from typing import IO, Any +from urllib.parse import urlsplit + +from packaging.version import parse as parse_version +from typing_extensions import overload + +from .exceptions import AirflowConfigException + +log = logging.getLogger(__name__) + +# show Airflow's deprecation warnings +if not sys.warnoptions: + warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow") + warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow") + +_SQLITE3_VERSION_PATTERN = re.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$") + +ConfigType = str | int | float | bool +ConfigOptionsDictType = dict[str, ConfigType] +ConfigSectionSourcesType = dict[str, str | tuple[str, str]] +ConfigSourcesType = dict[str, ConfigSectionSourcesType] + +ENV_VAR_PREFIX = "AIRFLOW__" + +# Have to duplicate to avoid imports from core +DEFAULT_SECRETS_SEARCH_PATH = [ + "airflow.secrets.environment_variables.EnvironmentVariablesBackend", + "airflow.secrets.metastore.MetastoreBackend", +] + + +class ConfigModifications: + """ + Holds modifications to be applied when writing out the config. + + :param rename: Mapping from (old_section, old_option) to (new_section, new_option) + :param remove: Set of (section, option) to remove + :param default_updates: Mapping from (section, option) to new default value + """ + + def __init__(self) -> None: + self.rename: dict[tuple[str, str], tuple[str, str]] = {} + self.remove: set[tuple[str, str]] = set() + self.default_updates: dict[tuple[str, str], str] = {} + + def add_rename(self, old_section: str, old_option: str, new_section: str, new_option: str) -> None: + self.rename[(old_section, old_option)] = (new_section, new_option) + + def add_remove(self, section: str, option: str) -> None: + self.remove.add((section, option)) + + def add_default_update(self, section: str, option: str, new_default: str) -> None: + self.default_updates[(section, option)] = new_default + + +def _parse_sqlite_version(s: str) -> tuple[int, ...]: + match = _SQLITE3_VERSION_PATTERN.match(s) + if match is None: + return () + return tuple(int(p) for p in match.group("version").split(".")) + + +@overload +def expand_env_var(env_var: None) -> None: ... + + +@overload +def expand_env_var(env_var: str) -> str: ... + + +def expand_env_var(env_var: str | None) -> str | None: + """ + Expand (potentially nested) env vars. + + Repeat and apply `expandvars` and `expanduser` until + interpolation stops having any effect. + """ + if not env_var or not isinstance(env_var, str): + return env_var + while True: + interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) + if interpolated == env_var: + return interpolated + env_var = interpolated + + +def run_command(command: str) -> str: + """Run command and returns stdout.""" + process = subprocess.Popen( + shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True + ) + output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) + + if process.returncode != 0: + # Import exception dynamically to avoid circular imports + + raise AirflowConfigException( + f"Cannot execute {command}. Error code is: {process.returncode}. " + f"Output: {output}, Stderr: {stderr}" + ) + + return output + + +def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool: + """ + Check if the config is a template. + + :param configuration_description: description of configuration + :param section: section + :param key: key + :return: True if the config is a template + """ + return configuration_description.get(section, {}).get(key, {}).get("is_template", False) + + +def get_all_expansion_variables() -> dict[str, Any]: + """Get all variables available for configuration expansion.""" + # Use pathlib for cleaner path handling + current_file = pathlib.Path(__file__) + repo_root = current_file.parent.parent.parent.parent.parent.parent + + # Set up dags folder for unit tests + _TEST_DAGS_FOLDER = repo_root / "airflow-core" / "tests" / "unit" / "dags" + if _TEST_DAGS_FOLDER.exists(): + TEST_DAGS_FOLDER = str(_TEST_DAGS_FOLDER) + else: + TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags") + + # Set up plugins folder for unit tests + _TEST_PLUGINS_FOLDER = repo_root / "airflow-core" / "tests" / "unit" / "plugins" + if _TEST_PLUGINS_FOLDER.exists(): + TEST_PLUGINS_FOLDER = str(_TEST_PLUGINS_FOLDER) + else: + TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins") + + return { + "AIRFLOW_HOME": AIRFLOW_HOME, + "AIRFLOW_CONFIG": AIRFLOW_CONFIG, + "FERNET_KEY": FERNET_KEY, + "JWT_SECRET_KEY": JWT_SECRET_KEY, + "SECRET_KEY": SECRET_KEY, + "TEST_DAGS_FOLDER": TEST_DAGS_FOLDER, + "TEST_PLUGINS_FOLDER": TEST_PLUGINS_FOLDER, + } + + +def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser: + """ + Create default config parser based on configuration description. + + It creates ConfigParser with all default values retrieved from the configuration description and + expands all the variables from the global and local variables defined in this module. + + :param configuration_description: configuration description - retrieved from config.yaml files + following the schema defined in "config.yml.schema.json" in the config_templates folder. + :return: Default Config Parser that can be used to read configuration values from. + """ + parser = ConfigParser() + all_vars = get_all_expansion_variables() + for section, section_desc in configuration_description.items(): + parser.add_section(section) + options = section_desc["options"] + for key in options: + default_value = options[key]["default"] + is_template = options[key].get("is_template", False) + if default_value is not None: + if is_template or not isinstance(default_value, str): + parser.set(section, key, default_value) + else: + parser.set(section, key, default_value.format(**all_vars)) + return parser + + +def _import_string(dotted_path: str): + """ + Import a dotted module path and return the attribute/class designated by the last name in the path. + + Raise ImportError if the import failed. + """ + # TODO: Add support for nested classes. Currently, it only works for top-level classes. + from importlib import import_module + + try: + module_path, class_name = dotted_path.rsplit(".", 1) + except ValueError: + raise ImportError(f"{dotted_path} doesn't look like a module path") + + module = import_module(module_path) + + try: + return getattr(module, class_name) + except AttributeError: + raise ImportError(f'Module "{module_path}" does not define a "{class_name}" attribute/class') + + +class AirflowConfigParser(ConfigParser): + """ + Custom Airflow Configparser supporting defaults and deprecated options. + + This is a subclass of ConfigParser that supports defaults and deprecated options. + + The defaults are stored in the ``_default_values``. The configuration description keeps + description of all the options available in Airflow (description follow config.yaml.schema). + + :param default_config: default configuration (in the form of ini file). + :param configuration_description: description of configuration to use + """ + + def __init__( + self, + config_templates_dir: str = "", + default_config: str | None = None, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + if config_templates_dir: + self.config_templates_dir = config_templates_dir + else: + self.config_templates_dir = find_config_templates_dir() Review Comment: Good catch, removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
