Sxnan commented on code in PR #122:
URL: https://github.com/apache/flink-agents/pull/122#discussion_r2299896938


##########
python/flink_agents/api/execution_environment.py:
##########
@@ -108,6 +109,16 @@ def get_execution_environment(
                 "flink_agents.runtime.remote_execution_environment"
             ).create_instance(env=env, **kwargs)
 
+    @abstractmethod
+    def get_config(self, path: Optional[str] = None) -> WritableConfiguration:

Review Comment:
   Should we return a Configuration that supports both read and write?



##########
python/flink_agents/plan/configuration.py:
##########
@@ -0,0 +1,224 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from pathlib import Path
+from typing import Any, Dict, Optional
+
+import yaml
+from pydantic import BaseModel
+from pyflink.common import Configuration
+from typing_extensions import override
+
+from flink_agents.api.configuration import (
+    ConfigOption,
+    ReadableConfiguration,
+    WritableConfiguration,
+)
+
+
+def flatten_dict(d: Dict, parent_key: str = '', sep: str = '.') -> Dict[str, 
Any]:
+    """Flatten a nested dictionary into a single-level dictionary.
+
+    This function recursively traverses the dictionary, converting multi-level
+    nested key-value pairs into a single-level structure, where nested levels
+    are represented by joining key names with the specified separator.
+
+    Args:
+        d (Dict): The nested dictionary to be flattened
+        parent_key (str): The parent key name, used in recursion to track the
+                         upper-level key path. Defaults to an empty string.
+        sep (str): The separator used to join parent and child keys.
+                  Defaults to dot ('.').
+
+    Returns:
+        Dict[str, Any]: A flattened single-level dictionary where keys from
+                       the original nested structure are joined with the 
separator
+    """
+    items = {}
+    for k, v in d.items():
+        new_key = f"{parent_key}{sep}{k}" if parent_key else k
+        if isinstance(v, dict):
+            items.update(flatten_dict(v, new_key, sep=sep))
+        else:
+            items[new_key] = v
+    return items
+
+class AgentConfiguration(BaseModel, WritableConfiguration, 
ReadableConfiguration):
+    """Base class for config objects in the system.
+    Provides a flat dict interface to access nested config values.
+    """
+
+    conf_data: Dict[str, Any]
+
+    def __init__(self, conf_data: Optional[Dict[str, Any]] = None) -> None:
+        """Initialize with optional configuration data."""
+        if conf_data is None:
+            super().__init__(conf_data = {})
+        else:
+            super().__init__(conf_data = conf_data)
+
+    @override
+    def get_int(self, key: str, default: Optional[int]=None) -> int:
+        value = self.conf_data.get(key)

Review Comment:
   I think we can extract a helper method for all the get_xxx to avoid 
duplicates code



##########
python/flink_agents/plan/configuration.py:
##########
@@ -0,0 +1,224 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from pathlib import Path
+from typing import Any, Dict, Optional
+
+import yaml
+from pydantic import BaseModel
+from pyflink.common import Configuration
+from typing_extensions import override
+
+from flink_agents.api.configuration import (
+    ConfigOption,
+    ReadableConfiguration,
+    WritableConfiguration,
+)
+
+
+def flatten_dict(d: Dict, parent_key: str = '', sep: str = '.') -> Dict[str, 
Any]:
+    """Flatten a nested dictionary into a single-level dictionary.
+
+    This function recursively traverses the dictionary, converting multi-level
+    nested key-value pairs into a single-level structure, where nested levels
+    are represented by joining key names with the specified separator.
+
+    Args:
+        d (Dict): The nested dictionary to be flattened
+        parent_key (str): The parent key name, used in recursion to track the
+                         upper-level key path. Defaults to an empty string.
+        sep (str): The separator used to join parent and child keys.
+                  Defaults to dot ('.').
+
+    Returns:
+        Dict[str, Any]: A flattened single-level dictionary where keys from
+                       the original nested structure are joined with the 
separator
+    """
+    items = {}
+    for k, v in d.items():
+        new_key = f"{parent_key}{sep}{k}" if parent_key else k
+        if isinstance(v, dict):
+            items.update(flatten_dict(v, new_key, sep=sep))
+        else:
+            items[new_key] = v
+    return items
+
+class AgentConfiguration(BaseModel, WritableConfiguration, 
ReadableConfiguration):
+    """Base class for config objects in the system.
+    Provides a flat dict interface to access nested config values.
+    """
+
+    conf_data: Dict[str, Any]
+
+    def __init__(self, conf_data: Optional[Dict[str, Any]] = None) -> None:
+        """Initialize with optional configuration data."""
+        if conf_data is None:
+            super().__init__(conf_data = {})
+        else:
+            super().__init__(conf_data = conf_data)
+
+    @override
+    def get_int(self, key: str, default: Optional[int]=None) -> int:
+        value = self.conf_data.get(key)
+        if value is None:
+            if default is None:
+                msg = f"Missing key: {key}"
+                raise KeyError(msg)
+            return default
+
+        try:
+            return int(value)
+        except (ValueError, TypeError) as e:
+            msg = f"Invalid value for {key}: {value}"
+            raise ValueError(msg) from e
+
+    @override
+    def get_float(self, key: str, default: Optional[float]=None) -> float:
+        value = self.conf_data.get(key)
+        if value is None:
+            if default is None:
+                msg = f"Missing key: {key}"
+                raise KeyError(msg)
+            return default
+
+        try:
+            return float(value)
+        except (ValueError, TypeError) as e:
+            msg = f"Invalid value for {key}: {value}"
+            raise ValueError(msg) from e
+
+    @override
+    def get_bool(self, key: str, default: Optional[bool]=None) -> bool:
+        value = self.conf_data.get(key)
+        if value is None:
+            if default is None:
+                msg = f"Missing key: {key}"
+                raise KeyError(msg)
+            return default
+
+        try:
+            return bool(value)
+        except (ValueError, TypeError) as e:
+            msg = f"Invalid value for {key}: {value}"
+            raise ValueError(msg) from e
+
+    @override
+    def get_str(self, key: str, default: Optional[str]=None) -> str:
+        value = self.conf_data.get(key)
+        if value is None:
+            if default is None:
+                msg = f"Missing key: {key}"
+                raise KeyError(msg)
+            return default
+
+        try:
+            return str(value)
+        except (ValueError, TypeError) as e:
+            msg = f"Invalid value for {key}: {value}"
+            raise ValueError(msg) from e
+
+    @override
+    def get(self, option: ConfigOption) -> Any:
+        value = self.conf_data.get(option.get_key())
+        if value is None:
+            if option.get_default_value() is None:
+                msg = f"Missing key: {option.get_key()}"
+                raise KeyError(msg)
+            return option.get_default_value()
+        try:
+            return option.get_type()(value)
+        except (ValueError, TypeError) as e:
+            msg = f"Invalid value for {option.get_key()}: {value}"
+            raise ValueError(msg) from e
+
+    @override
+    def set_str(self, key: str, value: str) -> None:
+        self.conf_data[key] = value
+
+    @override
+    def set_int(self, key: str, value: int) -> None:
+        self.conf_data[key] = value
+
+    @override
+    def set_float(self, key: str, value: float) -> None:
+        self.conf_data[key] = value
+
+    @override
+    def set_bool(self, key: str, value: bool) -> None:
+        self.conf_data[key] = value
+
+    @override
+    def set(self, option: ConfigOption, value: Any) -> None:
+        self.conf_data[option.get_key()] = value
+
+    def load_from_file(self, config_path: Optional[str] = None) -> None:
+        """Load configuration from a YAML file and update current 
configuration data.
+
+        Args:
+            config_path (str, optional): Path to the configuration file.
+        """
+        if config_path:
+            path = Path(config_path)
+            with path.open() as f:
+                raw_config = yaml.safe_load(f)
+                self.conf_data.update(flatten_dict(raw_config.get('Agent', 
{})))
+
+    def get_conf_data(self) -> dict:
+        """Get the configuration data dictionary.
+
+        Returns:
+            dict: A dictionary containing all configuration items
+        """
+        return self.conf_data
+
+    def revert_to_flink_config(self) -> Configuration:

Review Comment:
   The naming is confusing. Maybe `convert_to_flink_config` or just 
`to_flink_config`



##########
python/flink_agents/plan/configuration.py:
##########
@@ -0,0 +1,224 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from pathlib import Path
+from typing import Any, Dict, Optional
+
+import yaml
+from pydantic import BaseModel
+from pyflink.common import Configuration
+from typing_extensions import override
+
+from flink_agents.api.configuration import (
+    ConfigOption,
+    ReadableConfiguration,
+    WritableConfiguration,
+)
+
+
+def flatten_dict(d: Dict, parent_key: str = '', sep: str = '.') -> Dict[str, 
Any]:
+    """Flatten a nested dictionary into a single-level dictionary.
+
+    This function recursively traverses the dictionary, converting multi-level
+    nested key-value pairs into a single-level structure, where nested levels
+    are represented by joining key names with the specified separator.
+
+    Args:
+        d (Dict): The nested dictionary to be flattened
+        parent_key (str): The parent key name, used in recursion to track the
+                         upper-level key path. Defaults to an empty string.
+        sep (str): The separator used to join parent and child keys.
+                  Defaults to dot ('.').
+
+    Returns:
+        Dict[str, Any]: A flattened single-level dictionary where keys from
+                       the original nested structure are joined with the 
separator
+    """
+    items = {}
+    for k, v in d.items():
+        new_key = f"{parent_key}{sep}{k}" if parent_key else k
+        if isinstance(v, dict):
+            items.update(flatten_dict(v, new_key, sep=sep))
+        else:
+            items[new_key] = v
+    return items
+
+class AgentConfiguration(BaseModel, WritableConfiguration, 
ReadableConfiguration):
+    """Base class for config objects in the system.
+    Provides a flat dict interface to access nested config values.
+    """
+
+    conf_data: Dict[str, Any]
+
+    def __init__(self, conf_data: Optional[Dict[str, Any]] = None) -> None:
+        """Initialize with optional configuration data."""
+        if conf_data is None:
+            super().__init__(conf_data = {})
+        else:
+            super().__init__(conf_data = conf_data)
+
+    @override
+    def get_int(self, key: str, default: Optional[int]=None) -> int:
+        value = self.conf_data.get(key)
+        if value is None:
+            if default is None:

Review Comment:
   After rethinking the behavior of throwing an exception in case of None 
default value, I think such behavior is too strict that we cannot have a 
default value of None. 
   
   I think we can just return the deafult value anyway, and let the caller 
decide whether to throw an exception in case of a None value. 
   
   WDYT?



##########
python/flink_agents/plan/configuration.py:
##########
@@ -0,0 +1,224 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from pathlib import Path
+from typing import Any, Dict, Optional
+
+import yaml
+from pydantic import BaseModel
+from pyflink.common import Configuration
+from typing_extensions import override
+
+from flink_agents.api.configuration import (
+    ConfigOption,
+    ReadableConfiguration,
+    WritableConfiguration,
+)
+
+
+def flatten_dict(d: Dict, parent_key: str = '', sep: str = '.') -> Dict[str, 
Any]:
+    """Flatten a nested dictionary into a single-level dictionary.
+
+    This function recursively traverses the dictionary, converting multi-level
+    nested key-value pairs into a single-level structure, where nested levels
+    are represented by joining key names with the specified separator.
+
+    Args:
+        d (Dict): The nested dictionary to be flattened
+        parent_key (str): The parent key name, used in recursion to track the
+                         upper-level key path. Defaults to an empty string.
+        sep (str): The separator used to join parent and child keys.
+                  Defaults to dot ('.').
+
+    Returns:
+        Dict[str, Any]: A flattened single-level dictionary where keys from
+                       the original nested structure are joined with the 
separator
+    """
+    items = {}
+    for k, v in d.items():
+        new_key = f"{parent_key}{sep}{k}" if parent_key else k
+        if isinstance(v, dict):
+            items.update(flatten_dict(v, new_key, sep=sep))
+        else:
+            items[new_key] = v
+    return items
+
+class AgentConfiguration(BaseModel, WritableConfiguration, 
ReadableConfiguration):
+    """Base class for config objects in the system.
+    Provides a flat dict interface to access nested config values.
+    """
+
+    conf_data: Dict[str, Any]
+
+    def __init__(self, conf_data: Optional[Dict[str, Any]] = None) -> None:
+        """Initialize with optional configuration data."""
+        if conf_data is None:
+            super().__init__(conf_data = {})
+        else:
+            super().__init__(conf_data = conf_data)
+
+    @override
+    def get_int(self, key: str, default: Optional[int]=None) -> int:
+        value = self.conf_data.get(key)
+        if value is None:
+            if default is None:
+                msg = f"Missing key: {key}"
+                raise KeyError(msg)
+            return default
+
+        try:
+            return int(value)
+        except (ValueError, TypeError) as e:
+            msg = f"Invalid value for {key}: {value}"
+            raise ValueError(msg) from e
+
+    @override
+    def get_float(self, key: str, default: Optional[float]=None) -> float:
+        value = self.conf_data.get(key)
+        if value is None:
+            if default is None:
+                msg = f"Missing key: {key}"
+                raise KeyError(msg)
+            return default
+
+        try:
+            return float(value)
+        except (ValueError, TypeError) as e:
+            msg = f"Invalid value for {key}: {value}"
+            raise ValueError(msg) from e
+
+    @override
+    def get_bool(self, key: str, default: Optional[bool]=None) -> bool:
+        value = self.conf_data.get(key)
+        if value is None:
+            if default is None:
+                msg = f"Missing key: {key}"
+                raise KeyError(msg)
+            return default
+
+        try:
+            return bool(value)
+        except (ValueError, TypeError) as e:
+            msg = f"Invalid value for {key}: {value}"
+            raise ValueError(msg) from e
+
+    @override
+    def get_str(self, key: str, default: Optional[str]=None) -> str:
+        value = self.conf_data.get(key)
+        if value is None:
+            if default is None:
+                msg = f"Missing key: {key}"
+                raise KeyError(msg)
+            return default
+
+        try:
+            return str(value)
+        except (ValueError, TypeError) as e:
+            msg = f"Invalid value for {key}: {value}"
+            raise ValueError(msg) from e
+
+    @override
+    def get(self, option: ConfigOption) -> Any:
+        value = self.conf_data.get(option.get_key())
+        if value is None:
+            if option.get_default_value() is None:
+                msg = f"Missing key: {option.get_key()}"
+                raise KeyError(msg)
+            return option.get_default_value()
+        try:
+            return option.get_type()(value)
+        except (ValueError, TypeError) as e:
+            msg = f"Invalid value for {option.get_key()}: {value}"
+            raise ValueError(msg) from e
+
+    @override
+    def set_str(self, key: str, value: str) -> None:
+        self.conf_data[key] = value
+
+    @override
+    def set_int(self, key: str, value: int) -> None:
+        self.conf_data[key] = value
+
+    @override
+    def set_float(self, key: str, value: float) -> None:
+        self.conf_data[key] = value
+
+    @override
+    def set_bool(self, key: str, value: bool) -> None:
+        self.conf_data[key] = value
+
+    @override
+    def set(self, option: ConfigOption, value: Any) -> None:
+        self.conf_data[option.get_key()] = value
+
+    def load_from_file(self, config_path: Optional[str] = None) -> None:
+        """Load configuration from a YAML file and update current 
configuration data.
+
+        Args:
+            config_path (str, optional): Path to the configuration file.
+        """
+        if config_path:
+            path = Path(config_path)
+            with path.open() as f:
+                raw_config = yaml.safe_load(f)
+                self.conf_data.update(flatten_dict(raw_config.get('Agent', 
{})))
+
+    def get_conf_data(self) -> dict:
+        """Get the configuration data dictionary.
+
+        Returns:
+            dict: A dictionary containing all configuration items
+        """
+        return self.conf_data
+
+    def revert_to_flink_config(self) -> Configuration:
+        """Revert LocalConfiguration to Flink configuration."""
+        flink_config = Configuration()
+        for key in self.conf_data:
+            value = self.conf_data[key]
+            if isinstance(value, bool):
+                flink_config.set_boolean(key, value)
+            elif isinstance(value, int):
+                flink_config.set_integer(key, value)
+            elif isinstance(value, float):
+                flink_config.set_float(key, value)
+            elif isinstance(value, str):
+                flink_config.set_string(key, value)
+            else:
+                flink_config.set_string(key, str(value))
+        return flink_config
+
+    def get_config_data_by_prefix(self, prefix: str) -> dict:
+        """Extract configuration items for a specific module from the 
configuration

Review Comment:
   The comment is inaccurate. There should not be a notion of 'module'



##########
python/flink_agents/plan/resource_provider.py:
##########
@@ -88,11 +92,21 @@ class PythonResourceProvider(ResourceProvider):
     clazz: str
     kwargs: Dict[str, Any]
 
-    def provide(self, get_resource: Callable) -> Resource:
+    def provide(self, get_resource: Callable, config: AgentConfiguration) -> 
Resource:
         """Create resource in runtime."""
         module = importlib.import_module(self.module)
         cls = getattr(module, self.clazz)
-        resource = cls(**self.kwargs, get_resource=get_resource)
+
+        final_kwargs = {}
+
+        clazz_kwargs = config.get_config_data_by_prefix(self.clazz)

Review Comment:
   Maybe 'resource_class_config' and `resource_config` make more sense



##########
runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java:
##########
@@ -43,8 +48,22 @@ public class RemoteExecutionEnvironment extends 
AgentsExecutionEnvironment {
 
     private final StreamExecutionEnvironment env;
 
+    private final AgentConfiguration config;
+
     public RemoteExecutionEnvironment(StreamExecutionEnvironment env) {
         this.env = env;
+        final String configDir = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);

Review Comment:
   Add tests for configuration loading



##########
python/flink_agents/runtime/local_execution_environment.py:
##########
@@ -86,6 +88,13 @@ class LocalExecutionEnvironment(AgentsExecutionEnvironment):
     __output: List[Any] = None
     __runner: LocalRunner = None
     __executed: bool = False
+    __config: AgentConfiguration = AgentConfiguration()
+
+    def get_config(self, path: Optional[str] = None) -> AgentConfiguration:
+        """Get configuration of execution environment."""
+        if path is not None:
+            self.__config.load_from_file(path)

Review Comment:
   The load_from_file only looks for configurations under key `agent`. For 
local configuration, do we need to put all the configuration under `agent`?



##########
runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java:
##########
@@ -55,27 +74,39 @@ public AgentBuilder fromList(List<Object> input) {
 
     @Override
     public <T, K> AgentBuilder fromDataStream(DataStream<T> input, 
KeySelector<T, K> keySelector) {
-        return new RemoteAgentBuilder<>(input, keySelector, env);
+        return new RemoteAgentBuilder<>(input, keySelector, env, config);
     }
 
     @Override
     public <K> AgentBuilder fromTable(
             Table input, StreamTableEnvironment tableEnv, KeySelector<Object, 
K> keySelector) {
-        return new RemoteAgentBuilder<>(input, tableEnv, keySelector, env);
+        return new RemoteAgentBuilder<>(input, tableEnv, keySelector, env, 
config);
     }
 
     @Override
     public void execute() throws Exception {
         env.execute();
     }
 
+    private static Map<String, Object> loadAgentConfiguration(String 
configDir) {
+        try {
+            return (Map<String, Object>)

Review Comment:
   Nit: unchecked cast warning



##########
plan/src/main/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonDeserializer.java:
##########
@@ -119,6 +120,23 @@ public AgentPlan deserialize(JsonParser parser, 
DeserializationContext ctx)
             }
         }
 
-        return new AgentPlan(actions, actionsByEvent, resourceProviders);
+        // Deserialize config data

Review Comment:
   Do we test for the serde of the configuration?



##########
python/flink_agents/runtime/remote_execution_environment.py:
##########
@@ -202,6 +223,8 @@ def from_list(self, input: List[Dict[str, Any]]) -> 
"AgentsExecutionEnvironment"
 
     def execute(self) -> None:
         """Execute agent."""
+        flink_config = self.__config.revert_to_flink_config()
+        self.__env.configure(flink_config)

Review Comment:
   Why do we need to configure Flink configuration? 



##########
plan/src/main/java/org/apache/flink/agents/plan/AgentConfiguration.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.plan;
+
+import org.apache.flink.agents.api.configuration.ConfigOption;
+import org.apache.flink.agents.api.configuration.ReadableConfiguration;
+import org.apache.flink.agents.api.configuration.WritableConfiguration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** Agent configuration which stores key/value pairs. */
+public class AgentConfiguration implements ReadableConfiguration, 
WritableConfiguration {
+    private final Map<String, Object> confData;
+
+    public AgentConfiguration() {
+        this.confData = new HashMap<>();
+    }
+
+    public AgentConfiguration(Map<String, Object> confData) {
+        this.confData = flatten(confData, "", ".");
+    }
+
+    public Map<String, Object> getConfData() {
+        return confData;
+    }
+
+    @Override
+    public void setStr(String key, String value) {
+        confData.put(key, value);
+    }
+
+    @Override
+    public void setInt(String key, int value) {
+        confData.put(key, value);
+    }
+
+    @Override
+    public void setFloat(String key, float value) {
+        confData.put(key, value);
+    }
+
+    @Override
+    public void setBool(String key, boolean value) {
+        confData.put(key, value);
+    }
+
+    @Override
+    public <T> void set(ConfigOption<T> option, T value) {
+        if (value == null && option.getDefaultValue() != null) {
+            return;
+        }
+        confData.put(option.getKey(), value);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public int getInt(String key, Integer defaultValue) {
+        return Optional.ofNullable(confData.get(key))
+                .map(Object::toString)
+                .map(Integer::parseInt)
+                .orElse(defaultValue);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public float getFloat(String key, Float defaultValue) {
+        return Optional.ofNullable(confData.get(key))
+                .map(Object::toString)
+                .map(Float::parseFloat)
+                .orElse(defaultValue);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public boolean getBool(String key, Boolean defaultValue) {
+        return Optional.ofNullable(confData.get(key))
+                .map(Object::toString)
+                .map(Boolean::valueOf)
+                .orElse(defaultValue);
+    }
+
+    @Override
+    public String getStr(String key, String defaultValue) {
+        return 
Optional.ofNullable(confData.get(key)).map(Object::toString).orElse(defaultValue);
+    }
+
+    @Override
+    public <T> T get(ConfigOption<T> option) {
+        Object rawValue = confData.get(option.getKey());
+        if (rawValue == null) {
+            return option.getDefaultValue();
+        }
+
+        Class<T> targetType = option.getType();
+
+        if (targetType.isAssignableFrom(rawValue.getClass())) {
+            return targetType.cast(rawValue);
+        } else if (String.class.equals(targetType)) {
+            return targetType.cast(rawValue.toString());
+        } else if (Integer.class.equals(targetType)) {
+            return targetType.cast(Integer.parseInt(rawValue.toString()));
+        } else if (Float.class.equals(targetType)) {
+            return targetType.cast(Float.parseFloat(rawValue.toString()));
+        } else if (Boolean.class.equals(targetType)) {
+            return targetType.cast(Boolean.parseBoolean(rawValue.toString()));
+        } else {
+            throw new ClassCastException(
+                    "Unsupported type conversion from "
+                            + rawValue.getClass().getName()
+                            + " to "
+                            + targetType.getName());
+        }
+    }
+
+    private static Map<String, Object> flatten(
+            Map<String, Object> config, String keyPrefix, String keySeparator) 
{
+        final Map<String, Object> flattenedMap = new HashMap<>();
+
+        config.forEach(
+                (key, value) -> {
+                    String flattenedKey = keyPrefix + key;
+                    if (value instanceof Map) {
+                        Map<String, Object> e = (Map<String, Object>) value;

Review Comment:
   Nit: We have an unchecked cast warning here



##########
runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java:
##########
@@ -109,4 +110,10 @@ public Resource getResource(String name, ResourceType 
type) throws Exception {
         }
         return agentPlan.getResource(name, type);
     }
+
+    @Override
+    public ReadableConfiguration getConfig() {
+        System.out.println(agentPlan.getConfigData());

Review Comment:
   Should this be removed?



##########
api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java:
##########
@@ -86,6 +87,13 @@ public static AgentsExecutionEnvironment 
getExecutionEnvironment() {
         return getExecutionEnvironment(null);
     }
 
+    /**
+     * Returns a writable configuration object for setting configuration 
values.
+     *
+     * @return the WritableConfiguration instance used to modify configuration 
settings
+     */
+    public abstract WritableConfiguration getConfig();

Review Comment:
   Maybe return a readable and writable?



##########
python/flink_agents/plan/configuration.py:
##########
@@ -0,0 +1,224 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from pathlib import Path
+from typing import Any, Dict, Optional
+
+import yaml
+from pydantic import BaseModel
+from pyflink.common import Configuration
+from typing_extensions import override
+
+from flink_agents.api.configuration import (
+    ConfigOption,
+    ReadableConfiguration,
+    WritableConfiguration,
+)
+
+
+def flatten_dict(d: Dict, parent_key: str = '', sep: str = '.') -> Dict[str, 
Any]:
+    """Flatten a nested dictionary into a single-level dictionary.
+
+    This function recursively traverses the dictionary, converting multi-level
+    nested key-value pairs into a single-level structure, where nested levels
+    are represented by joining key names with the specified separator.
+
+    Args:
+        d (Dict): The nested dictionary to be flattened
+        parent_key (str): The parent key name, used in recursion to track the
+                         upper-level key path. Defaults to an empty string.
+        sep (str): The separator used to join parent and child keys.
+                  Defaults to dot ('.').
+
+    Returns:
+        Dict[str, Any]: A flattened single-level dictionary where keys from
+                       the original nested structure are joined with the 
separator
+    """
+    items = {}
+    for k, v in d.items():
+        new_key = f"{parent_key}{sep}{k}" if parent_key else k
+        if isinstance(v, dict):
+            items.update(flatten_dict(v, new_key, sep=sep))
+        else:
+            items[new_key] = v
+    return items
+
+class AgentConfiguration(BaseModel, WritableConfiguration, 
ReadableConfiguration):
+    """Base class for config objects in the system.
+    Provides a flat dict interface to access nested config values.
+    """
+
+    conf_data: Dict[str, Any]
+
+    def __init__(self, conf_data: Optional[Dict[str, Any]] = None) -> None:
+        """Initialize with optional configuration data."""
+        if conf_data is None:
+            super().__init__(conf_data = {})
+        else:
+            super().__init__(conf_data = conf_data)
+
+    @override
+    def get_int(self, key: str, default: Optional[int]=None) -> int:
+        value = self.conf_data.get(key)
+        if value is None:
+            if default is None:
+                msg = f"Missing key: {key}"
+                raise KeyError(msg)
+            return default
+
+        try:
+            return int(value)
+        except (ValueError, TypeError) as e:
+            msg = f"Invalid value for {key}: {value}"
+            raise ValueError(msg) from e
+
+    @override
+    def get_float(self, key: str, default: Optional[float]=None) -> float:
+        value = self.conf_data.get(key)
+        if value is None:
+            if default is None:
+                msg = f"Missing key: {key}"
+                raise KeyError(msg)
+            return default
+
+        try:
+            return float(value)
+        except (ValueError, TypeError) as e:
+            msg = f"Invalid value for {key}: {value}"
+            raise ValueError(msg) from e
+
+    @override
+    def get_bool(self, key: str, default: Optional[bool]=None) -> bool:
+        value = self.conf_data.get(key)
+        if value is None:
+            if default is None:
+                msg = f"Missing key: {key}"
+                raise KeyError(msg)
+            return default
+
+        try:
+            return bool(value)
+        except (ValueError, TypeError) as e:
+            msg = f"Invalid value for {key}: {value}"
+            raise ValueError(msg) from e
+
+    @override
+    def get_str(self, key: str, default: Optional[str]=None) -> str:
+        value = self.conf_data.get(key)
+        if value is None:
+            if default is None:
+                msg = f"Missing key: {key}"
+                raise KeyError(msg)
+            return default
+
+        try:
+            return str(value)
+        except (ValueError, TypeError) as e:
+            msg = f"Invalid value for {key}: {value}"
+            raise ValueError(msg) from e
+
+    @override
+    def get(self, option: ConfigOption) -> Any:
+        value = self.conf_data.get(option.get_key())
+        if value is None:
+            if option.get_default_value() is None:
+                msg = f"Missing key: {option.get_key()}"
+                raise KeyError(msg)
+            return option.get_default_value()
+        try:
+            return option.get_type()(value)
+        except (ValueError, TypeError) as e:
+            msg = f"Invalid value for {option.get_key()}: {value}"
+            raise ValueError(msg) from e
+
+    @override
+    def set_str(self, key: str, value: str) -> None:
+        self.conf_data[key] = value
+
+    @override
+    def set_int(self, key: str, value: int) -> None:
+        self.conf_data[key] = value
+
+    @override
+    def set_float(self, key: str, value: float) -> None:
+        self.conf_data[key] = value
+
+    @override
+    def set_bool(self, key: str, value: bool) -> None:
+        self.conf_data[key] = value
+
+    @override
+    def set(self, option: ConfigOption, value: Any) -> None:
+        self.conf_data[option.get_key()] = value
+
+    def load_from_file(self, config_path: Optional[str] = None) -> None:
+        """Load configuration from a YAML file and update current 
configuration data.
+
+        Args:
+            config_path (str, optional): Path to the configuration file.
+        """
+        if config_path:
+            path = Path(config_path)
+            with path.open() as f:
+                raw_config = yaml.safe_load(f)
+                self.conf_data.update(flatten_dict(raw_config.get('Agent', 
{})))

Review Comment:
   Maybe use the lower case `agent`



##########
plan/src/main/java/org/apache/flink/agents/plan/AgentConfiguration.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.plan;
+
+import org.apache.flink.agents.api.configuration.ConfigOption;
+import org.apache.flink.agents.api.configuration.ReadableConfiguration;
+import org.apache.flink.agents.api.configuration.WritableConfiguration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** Agent configuration which stores key/value pairs. */
+public class AgentConfiguration implements ReadableConfiguration, 
WritableConfiguration {
+    private final Map<String, Object> confData;
+
+    public AgentConfiguration() {
+        this.confData = new HashMap<>();
+    }
+
+    public AgentConfiguration(Map<String, Object> confData) {
+        this.confData = flatten(confData, "", ".");
+    }
+
+    public Map<String, Object> getConfData() {
+        return confData;
+    }
+
+    @Override
+    public void setStr(String key, String value) {
+        confData.put(key, value);
+    }
+
+    @Override
+    public void setInt(String key, int value) {
+        confData.put(key, value);
+    }
+
+    @Override
+    public void setFloat(String key, float value) {
+        confData.put(key, value);
+    }
+
+    @Override
+    public void setBool(String key, boolean value) {
+        confData.put(key, value);
+    }
+
+    @Override
+    public <T> void set(ConfigOption<T> option, T value) {
+        if (value == null && option.getDefaultValue() != null) {
+            return;
+        }
+        confData.put(option.getKey(), value);
+    }
+
+    @SuppressWarnings("unchecked")

Review Comment:
   Why do we need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to