chen0427ok commented on code in PR #59291:
URL: https://github.com/apache/airflow/pull/59291#discussion_r2611576942
##########
task-sdk/src/airflow/sdk/definitions/param.py:
##########
@@ -64,13 +81,67 @@ def __init__(
self.description = description
self.schema = kwargs.pop("schema") if "schema" in kwargs else kwargs
self.source = source
+ self.config_source = config_source
+
+ # If config_source is provided, load options and add to schema as enum
+ if config_source:
+ try:
+ options = self._load_options_from_config(config_source)
+ if options:
+ self.schema["enum"] = options
+ except Exception as e:
+ logger.warning(
+ "Failed to load options from config_source: %s. "
+ "Parameter will be treated as a regular string field.",
+ e,
+ )
def __copy__(self) -> Param:
return Param(
self.value,
self.description,
schema=self.schema,
source=self.source,
+ config_source=self.config_source,
+ )
+
+ def _load_options_from_config(self, config_source: dict[str, Any]) ->
list[str]:
+ """
+ Load dropdown options from an external configuration file.
+
+ :param config_source: Dictionary containing configuration for loading
options.
+ Must include 'file' key with path to config file.
+ Optional keys: 'filter' (dict), 'key_field' (str)
+ :return: List of option values loaded from the configuration file
+ :raises FileNotFoundError: If the configuration file is not found
+ :raises ValueError: If config_source is invalid or file format is
unsupported
+ """
+ # Import here to avoid circular dependencies
+ from airflow.sdk.definitions.param_config_loader import (
+ load_options_from_ini,
+ load_options_from_json,
+ load_options_from_yaml,
+ )
+
+ file_path = config_source.get("file")
+ if not file_path:
+ raise ValueError("config_source must include 'file' key with path
to configuration file")
+
+ filter_conditions = config_source.get("filter")
+ key_field = config_source.get("key_field")
+
+ # Detect file type and use appropriate loader
+ if file_path.endswith(".ini"):
+ key_field = key_field or "section"
+ return load_options_from_ini(file_path, filter_conditions,
key_field)
+ if file_path.endswith(".json"):
+ key_field = key_field or "name"
+ return load_options_from_json(file_path, filter_conditions,
key_field)
+ if file_path.endswith((".yaml", ".yml")):
+ key_field = key_field or "name"
+ return load_options_from_yaml(file_path, filter_conditions,
key_field)
+ raise ValueError(
+ f"Unsupported config file type: {file_path}. Supported types are:
.ini, .json, .yaml, .yml"
Review Comment:
Thanks for the detailed review! I've addressed all your feedback:
1. Consolidated the three loader functions into `load_options_from_file()`
with an `extension` parameter
2. Added `ConfigSource` TypedDict for better type hints and IDE support
3. Removed unnecessary try-catch for yaml import (already in airflow-core
dependencies)
4. All tests passing (19 config loader tests + 4 Param integration tests)
The refactoring follows the DRY principle and makes the code much more
maintainable. Let me know if you need any other changes!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]