amoghrajesh commented on code in PR #57744:
URL: https://github.com/apache/airflow/pull/57744#discussion_r2537892762
##########
airflow-core/src/airflow/configuration.py:
##########
@@ -930,1047 +520,13 @@ def mask_secrets(self):
mask_secret_core(value)
mask_secret_sdk(value)
- def _env_var_name(self, section: str, key: str, team_name: str | None =
None) -> str:
- team_component: str = f"{team_name.upper()}___" if team_name else ""
- return f"{ENV_VAR_PREFIX}{team_component}{section.replace('.',
'_').upper()}__{key.upper()}"
-
- def _get_env_var_option(self, section: str, key: str, team_name: str |
None = None):
- # must have format AIRFLOW__{SECTION}__{KEY} (note double underscore)
OR for team based
- # configuration must have the format
AIRFLOW__{TEAM_NAME}___{SECTION}__{KEY}
- env_var: str = self._env_var_name(section, key, team_name=team_name)
- if env_var in os.environ:
- return expand_env_var(os.environ[env_var])
- # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command)
- env_var_cmd = env_var + "_CMD"
- if env_var_cmd in os.environ:
- # if this is a valid command key...
- if (section, key) in self.sensitive_config_values:
- return run_command(os.environ[env_var_cmd])
- # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets
Backend)
- env_var_secret_path = env_var + "_SECRET"
- if env_var_secret_path in os.environ:
- # if this is a valid secret path...
- if (section, key) in self.sensitive_config_values:
- return
_get_config_value_from_secret_backend(os.environ[env_var_secret_path])
- return None
-
- def _get_cmd_option(self, section: str, key: str):
- fallback_key = key + "_cmd"
- if (section, key) in self.sensitive_config_values:
- if super().has_option(section, fallback_key):
- command = super().get(section, fallback_key)
- try:
- cmd_output = run_command(command)
- except AirflowConfigException as e:
- raise e
- except Exception as e:
- raise AirflowConfigException(
- f"Cannot run the command for the config section
[{section}]{fallback_key}_cmd."
- f" Please check the {fallback_key} value."
- ) from e
- return cmd_output
- return None
-
- def _get_cmd_option_from_config_sources(
- self, config_sources: ConfigSourcesType, section: str, key: str
- ) -> str | None:
- fallback_key = key + "_cmd"
- if (section, key) in self.sensitive_config_values:
- section_dict = config_sources.get(section)
- if section_dict is not None:
- command_value = section_dict.get(fallback_key)
- if command_value is not None:
- if isinstance(command_value, str):
- command = command_value
- else:
- command = command_value[0]
- return run_command(command)
- return None
-
- def _get_secret_option(self, section: str, key: str) -> str | None:
- """Get Config option values from Secret Backend."""
- fallback_key = key + "_secret"
- if (section, key) in self.sensitive_config_values:
- if super().has_option(section, fallback_key):
- secrets_path = super().get(section, fallback_key)
- return _get_config_value_from_secret_backend(secrets_path)
- return None
-
- def _get_secret_option_from_config_sources(
- self, config_sources: ConfigSourcesType, section: str, key: str
- ) -> str | None:
- fallback_key = key + "_secret"
- if (section, key) in self.sensitive_config_values:
- section_dict = config_sources.get(section)
- if section_dict is not None:
- secrets_path_value = section_dict.get(fallback_key)
- if secrets_path_value is not None:
- if isinstance(secrets_path_value, str):
- secrets_path = secrets_path_value
- else:
- secrets_path = secrets_path_value[0]
- return _get_config_value_from_secret_backend(secrets_path)
- return None
-
- def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
- value = self.get(section, key, _extra_stacklevel=1, **kwargs)
- if value is None:
- raise ValueError(f"The value {section}/{key} should be set!")
- return value
-
- def get_mandatory_list_value(self, section: str, key: str, **kwargs) ->
list[str]:
- value = self.getlist(section, key, **kwargs)
- if value is None:
- raise ValueError(f"The value {section}/{key} should be set!")
- return value
-
- def _get_option_from_defaults(
- self,
- deprecated_key: str | None,
- deprecated_section: str | None,
- key: str,
- section: str,
- issue_warning: bool = True,
- extra_stacklevel: int = 0,
- **kwargs,
- ) -> str | ValueNotFound:
- """Get config option from default values."""
- if self.get_default_value(section, key) is not None or "fallback" in
kwargs:
- return expand_env_var(self.get_default_value(section, key,
**kwargs))
- return VALUE_NOT_FOUND_SENTINEL
-
- def _get_option_from_provider_fallbacks(
- self,
- deprecated_key: str | None,
- deprecated_section: str | None,
- key: str,
- section: str,
- issue_warning: bool = True,
- extra_stacklevel: int = 0,
- **kwargs,
- ) -> str | ValueNotFound:
- """Get config option from provider fallback defaults."""
- if self.get_provider_config_fallback_defaults(section, key) is not
None:
- # no expansion needed
- return self.get_provider_config_fallback_defaults(section, key,
**kwargs)
- return VALUE_NOT_FOUND_SENTINEL
-
- def _resolve_deprecated_lookup(
- self,
- section: str,
- key: str,
- lookup_from_deprecated: bool,
- extra_stacklevel: int = 0,
- ) -> tuple[str, str, str | None, str | None, bool]:
- """
- Resolve deprecated section/key mappings and determine deprecated
values.
-
- :param section: Section name (will be lowercased)
- :param key: Key name (will be lowercased)
- :param lookup_from_deprecated: Whether to lookup from deprecated
options
- :param extra_stacklevel: Extra stack level for warnings
- :return: Tuple of (resolved_section, resolved_key, deprecated_section,
deprecated_key, warning_emitted)
- """
- section = section.lower()
- key = key.lower()
- warning_emitted = False
- deprecated_section: str | None = None
- deprecated_key: str | None = None
-
- if not lookup_from_deprecated:
- return section, key, deprecated_section, deprecated_key,
warning_emitted
-
- option_description = (
- self.configuration_description.get(section, {}).get("options",
{}).get(key, {})
- if self.configuration_description
- else {}
- )
- if option_description.get("deprecated"):
- deprecation_reason = option_description.get("deprecation_reason",
"")
- warnings.warn(
- f"The '{key}' option in section {section} is deprecated.
{deprecation_reason}",
- DeprecationWarning,
- stacklevel=2 + extra_stacklevel,
- )
- # For the cases in which we rename whole sections
- if section in self.inversed_deprecated_sections:
- deprecated_section, deprecated_key = (section, key)
- section = self.inversed_deprecated_sections[section]
- if not self._suppress_future_warnings:
- warnings.warn(
- f"The config section [{deprecated_section}] has been
renamed to "
- f"[{section}]. Please update your `conf.get*` call to use
the new name",
- FutureWarning,
- stacklevel=2 + extra_stacklevel,
- )
- # Don't warn about individual rename if the whole section is
renamed
- warning_emitted = True
- elif (section, key) in self.inversed_deprecated_options:
- # Handle using deprecated section/key instead of the new
section/key
- new_section, new_key = self.inversed_deprecated_options[(section,
key)]
- if not self._suppress_future_warnings and not warning_emitted:
- warnings.warn(
- f"section/key [{section}/{key}] has been deprecated, you
should use"
- f"[{new_section}/{new_key}] instead. Please update your
`conf.get*` call to use the "
- "new name",
- FutureWarning,
- stacklevel=2 + extra_stacklevel,
- )
- warning_emitted = True
- deprecated_section, deprecated_key = section, key
- section, key = (new_section, new_key)
- elif section in self.deprecated_sections:
- # When accessing the new section name, make sure we check under
the old config name
- deprecated_key = key
- deprecated_section = self.deprecated_sections[section][0]
- else:
- deprecated_section, deprecated_key, _ =
self.deprecated_options.get(
- (section, key), (None, None, None)
- )
-
- return section, key, deprecated_section, deprecated_key,
warning_emitted
-
- @overload # type: ignore[override]
- def get(self, section: str, key: str, fallback: str = ..., **kwargs) ->
str: ...
-
- @overload # type: ignore[override]
- def get(self, section: str, key: str, **kwargs) -> str | None: ...
-
- def get( # type: ignore[misc, override]
- self,
- section: str,
- key: str,
- suppress_warnings: bool = False,
- lookup_from_deprecated: bool = True,
- _extra_stacklevel: int = 0,
- team_name: str | None = None,
- **kwargs,
- ) -> str | None:
- """
- Get config value by iterating through lookup sequence.
-
- Priority order is defined by _lookup_sequence property.
- """
- section, key, deprecated_section, deprecated_key, warning_emitted =
self._resolve_deprecated_lookup(
- section=section,
- key=key,
- lookup_from_deprecated=lookup_from_deprecated,
- extra_stacklevel=_extra_stacklevel,
- )
-
- if team_name is not None:
- kwargs["team_name"] = team_name
-
- for lookup_method in self._lookup_sequence:
- value = lookup_method(
- deprecated_key=deprecated_key,
- deprecated_section=deprecated_section,
- key=key,
- section=section,
- issue_warning=not warning_emitted,
- extra_stacklevel=_extra_stacklevel,
- **kwargs,
- )
- if value is not VALUE_NOT_FOUND_SENTINEL:
- return value
-
- # Check if fallback was explicitly provided (even if None)
- if "fallback" in kwargs:
- return kwargs["fallback"]
-
- if not suppress_warnings:
- log.warning("section/key [%s/%s] not found in config", section,
key)
-
- raise AirflowConfigException(f"section/key [{section}/{key}] not found
in config")
-
- def _get_option_from_secrets(
- self,
- deprecated_key: str | None,
- deprecated_section: str | None,
- key: str,
- section: str,
- issue_warning: bool = True,
- extra_stacklevel: int = 0,
- **kwargs,
- ) -> str | ValueNotFound:
- option = self._get_secret_option(section, key)
- if option:
- return option
- if deprecated_section and deprecated_key:
- with self.suppress_future_warnings():
- option = self._get_secret_option(deprecated_section,
deprecated_key)
- if option:
- if issue_warning:
- self._warn_deprecate(section, key, deprecated_section,
deprecated_key, extra_stacklevel)
- return option
- return VALUE_NOT_FOUND_SENTINEL
-
- def _get_option_from_commands(
- self,
- deprecated_key: str | None,
- deprecated_section: str | None,
- key: str,
- section: str,
- issue_warning: bool = True,
- extra_stacklevel: int = 0,
- **kwargs,
- ) -> str | ValueNotFound:
- option = self._get_cmd_option(section, key)
- if option:
- return option
- if deprecated_section and deprecated_key:
- with self.suppress_future_warnings():
- option = self._get_cmd_option(deprecated_section,
deprecated_key)
- if option:
- if issue_warning:
- self._warn_deprecate(section, key, deprecated_section,
deprecated_key, extra_stacklevel)
- return option
- return VALUE_NOT_FOUND_SENTINEL
-
- def _get_option_from_config_file(
- self,
- deprecated_key: str | None,
- deprecated_section: str | None,
- key: str,
- section: str,
- issue_warning: bool = True,
- extra_stacklevel: int = 0,
- **kwargs,
- ) -> str | ValueNotFound:
- """Get config option from config file."""
- if team_name := kwargs.get("team_name", None):
- section = f"{team_name}={section}"
- # since this is the last lookup that supports team_name, pop it
- kwargs.pop("team_name")
- if super().has_option(section, key):
- # Use the parent's methods to get the actual config here to be
able to
- # separate the config from default config.
- return expand_env_var(super().get(section, key, **kwargs))
- if deprecated_section and deprecated_key:
- if super().has_option(deprecated_section, deprecated_key):
- if issue_warning:
- self._warn_deprecate(section, key, deprecated_section,
deprecated_key, extra_stacklevel)
- with self.suppress_future_warnings():
- return expand_env_var(super().get(deprecated_section,
deprecated_key, **kwargs))
- return VALUE_NOT_FOUND_SENTINEL
-
- def _get_environment_variables(
- self,
- deprecated_key: str | None,
- deprecated_section: str | None,
- key: str,
- section: str,
- issue_warning: bool = True,
- extra_stacklevel: int = 0,
- **kwargs,
- ) -> str | ValueNotFound:
- """Get config option from environment variables."""
- team_name = kwargs.get("team_name", None)
- option = self._get_env_var_option(section, key, team_name=team_name)
- if option is not None:
- return option
- if deprecated_section and deprecated_key:
- with self.suppress_future_warnings():
- option = self._get_env_var_option(deprecated_section,
deprecated_key, team_name=team_name)
- if option is not None:
- if issue_warning:
- self._warn_deprecate(section, key, deprecated_section,
deprecated_key, extra_stacklevel)
- return option
- return VALUE_NOT_FOUND_SENTINEL
-
- def getboolean(self, section: str, key: str, **kwargs) -> bool: # type:
ignore[override]
- val = str(self.get(section, key, _extra_stacklevel=1,
**kwargs)).lower().strip()
- if "#" in val:
- val = val.split("#")[0].strip()
- if val in ("t", "true", "1"):
- return True
- if val in ("f", "false", "0"):
- return False
- raise AirflowConfigException(
- f'Failed to convert value to bool. Please check "{key}" key in
"{section}" section. '
- f'Current value: "{val}".'
- )
-
- def getint(self, section: str, key: str, **kwargs) -> int: # type:
ignore[override]
- val = self.get(section, key, _extra_stacklevel=1, **kwargs)
- if val is None:
- raise AirflowConfigException(
- f"Failed to convert value None to int. "
- f'Please check "{key}" key in "{section}" section is set.'
- )
- try:
- return int(val)
- except ValueError:
- raise AirflowConfigException(
- f'Failed to convert value to int. Please check "{key}" key in
"{section}" section. '
- f'Current value: "{val}".'
- )
-
- def getfloat(self, section: str, key: str, **kwargs) -> float: # type:
ignore[override]
- val = self.get(section, key, _extra_stacklevel=1, **kwargs)
- if val is None:
- raise AirflowConfigException(
- f"Failed to convert value None to float. "
- f'Please check "{key}" key in "{section}" section is set.'
- )
- try:
- return float(val)
- except ValueError:
- raise AirflowConfigException(
- f'Failed to convert value to float. Please check "{key}" key
in "{section}" section. '
- f'Current value: "{val}".'
- )
-
- def getlist(self, section: str, key: str, delimiter=",", **kwargs):
- val = self.get(section, key, **kwargs)
- if val is None:
- if "fallback" in kwargs:
- return kwargs["fallback"]
- raise AirflowConfigException(
- f"Failed to convert value None to list. "
- f'Please check "{key}" key in "{section}" section is set.'
- )
- try:
- return [item.strip() for item in val.split(delimiter)]
- except Exception:
- raise AirflowConfigException(
- f'Failed to parse value to a list. Please check "{key}" key in
"{section}" section. '
- f'Current value: "{val}".'
- )
-
- E = TypeVar("E", bound=Enum)
-
- def getenum(self, section: str, key: str, enum_class: type[E], **kwargs)
-> E:
- val = self.get(section, key, **kwargs)
- enum_names = [enum_item.name for enum_item in enum_class]
-
- if val is None:
- raise AirflowConfigException(
- f'Failed to convert value. Please check "{key}" key in
"{section}" section. '
- f'Current value: "{val}" and it must be one of {",
".join(enum_names)}'
- )
-
- try:
- return enum_class[val]
- except KeyError:
- if "fallback" in kwargs and kwargs["fallback"] in enum_names:
- return enum_class[kwargs["fallback"]]
- raise AirflowConfigException(
- f'Failed to convert value. Please check "{key}" key in
"{section}" section. '
- f"the value must be one of {', '.join(enum_names)}"
- )
-
- def getenumlist(self, section: str, key: str, enum_class: type[E],
delimiter=",", **kwargs) -> list[E]:
- string_list = self.getlist(section, key, delimiter, **kwargs)
- enum_names = [enum_item.name for enum_item in enum_class]
- enum_list = []
-
- for val in string_list:
- try:
- enum_list.append(enum_class[val])
- except KeyError:
- log.warning(
- "Failed to convert value. Please check %s key in %s
section. "
- "it must be one of %s, if not the value is ignored",
- key,
- section,
- ", ".join(enum_names),
- )
-
- return enum_list
-
- def getimport(self, section: str, key: str, **kwargs) -> Any:
- """
- Read options, import the full qualified name, and return the object.
-
- In case of failure, it throws an exception with the key and section
names
-
- :return: The object or None, if the option is empty
- """
- full_qualified_path = conf.get(section=section, key=key, **kwargs)
- if not full_qualified_path:
- return None
-
- try:
- return import_string(full_qualified_path)
- except ImportError as e:
- log.warning(e)
- raise AirflowConfigException(
- f'The object could not be loaded. Please check "{key}" key in
"{section}" section. '
- f'Current value: "{full_qualified_path}".'
- )
-
- def getjson(
- self, section: str, key: str, fallback=None, **kwargs
- ) -> dict | list | str | int | float | None:
- """
- Return a config value parsed from a JSON string.
-
- ``fallback`` is *not* JSON parsed but used verbatim when no config
value is given.
- """
- try:
- data = self.get(section=section, key=key, fallback=None,
_extra_stacklevel=1, **kwargs)
- except (NoSectionError, NoOptionError):
- data = None
-
- if data is None or data == "":
- return fallback
-
- try:
- return json.loads(data)
- except JSONDecodeError as e:
- raise AirflowConfigException(f"Unable to parse [{section}] {key!r}
as valid json") from e
-
- def gettimedelta(
- self, section: str, key: str, fallback: Any = None, **kwargs
- ) -> datetime.timedelta | None:
- """
- Get the config value for the given section and key, and convert it
into datetime.timedelta object.
-
- If the key is missing, then it is considered as `None`.
-
- :param section: the section from the config
- :param key: the key defined in the given section
- :param fallback: fallback value when no config value is given,
defaults to None
- :raises AirflowConfigException: raised because ValueError or
OverflowError
- :return: datetime.timedelta(seconds=<config_value>) or None
- """
- val = self.get(section, key, fallback=fallback, _extra_stacklevel=1,
**kwargs)
-
- if val:
- # the given value must be convertible to integer
- try:
- int_val = int(val)
- except ValueError:
- raise AirflowConfigException(
- f'Failed to convert value to int. Please check "{key}" key
in "{section}" section. '
- f'Current value: "{val}".'
- )
-
- try:
- return datetime.timedelta(seconds=int_val)
- except OverflowError as err:
- raise AirflowConfigException(
- f"Failed to convert value to timedelta in `seconds`. "
- f"{err}. "
- f'Please check "{key}" key in "{section}" section. Current
value: "{val}".'
- )
-
- return fallback
-
- def read(
- self,
- filenames: str | bytes | os.PathLike | Iterable[str | bytes |
os.PathLike],
- encoding=None,
- ):
- super().read(filenames=filenames, encoding=encoding)
-
- def read_dict( # type: ignore[override]
- self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>"
- ):
- """
- We define a different signature here to add better type hints and
checking.
-
- :param dictionary: dictionary to read from
- :param source: source to be used to store the configuration
- :return:
- """
- super().read_dict(dictionary=dictionary, source=source)
-
- def has_option(self, section: str, option: str, lookup_from_deprecated:
bool = True) -> bool:
- """
- Check if option is defined.
-
- Uses self.get() to avoid reimplementing the priority order of config
variables
- (env, config, cmd, defaults).
-
- :param section: section to get option from
- :param option: option to get
- :param lookup_from_deprecated: If True, check if the option is defined
in deprecated sections
- :return:
- """
- try:
- value = self.get(
- section,
- option,
- fallback=None,
- _extra_stacklevel=1,
- suppress_warnings=True,
- lookup_from_deprecated=lookup_from_deprecated,
- )
- if value is None:
- return False
- return True
- except (NoOptionError, NoSectionError):
- return False
-
- def set(self, section: str, option: str, value: str | None = None) -> None:
- """
- Set an option to the given value.
-
- This override just makes sure the section and option are lower case,
to match what we do in `get`.
- """
- section = section.lower()
- option = option.lower()
- defaults = self.configuration_description or {}
- if not self.has_section(section) and section in defaults:
- # Trying to set a key in a section that exists in default, but not
in the user config;
- # automatically create it
- self.add_section(section)
- super().set(section, option, value)
-
- def remove_option(self, section: str, option: str, remove_default: bool =
True):
- """
- Remove an option if it exists in config from a file or default config.
-
- If both of config have the same option, this removes the option
- in both configs unless remove_default=False.
- """
- section = section.lower()
- option = option.lower()
- if super().has_option(section, option):
- super().remove_option(section, option)
+ # All parsing methods are now provided by the shared base class.
+ # This includes: get, getboolean, getint, getfloat, getlist, getenum,
getenumlist,
+ # getjson, gettimedelta, getimport, get_mandatory_value,
get_mandatory_list_value,
+ # has_option, set, remove_option, and helper methods like
_get_option_from_*.
+ # The shared base class's getimport() correctly uses self.get() instead of
conf.get().
Review Comment:
Sure
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]