This is an automated email from the ASF dual-hosted git repository.
sxnan pushed a commit to branch release-0.1
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/release-0.1 by this push:
new 3ce8ff6 [api] Support loading Flink legacy yaml config file (#298)
3ce8ff6 is described below
commit 3ce8ff6df7f25025c225d14f52b620a5a8f65274
Author: Xuannan <[email protected]>
AuthorDate: Wed Nov 5 09:21:40 2025 +0800
[api] Support loading Flink legacy yaml config file (#298)
---
.../runtime/remote_execution_environment.py | 51 +++++-
.../tests/test_remote_execution_environment.py | 181 +++++++++++++++++++++
2 files changed, 228 insertions(+), 4 deletions(-)
diff --git a/python/flink_agents/runtime/remote_execution_environment.py
b/python/flink_agents/runtime/remote_execution_environment.py
index 878c527..7117970 100644
--- a/python/flink_agents/runtime/remote_execution_environment.py
+++ b/python/flink_agents/runtime/remote_execution_environment.py
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
+import logging
import os
from pathlib import Path
from typing import Any, Callable, Dict, List
@@ -42,6 +43,8 @@ from flink_agents.api.resource import ResourceType
from flink_agents.plan.agent_plan import AgentPlan
from flink_agents.plan.configuration import AgentConfiguration
+_CONFIG_FILE_NAME = "config.yaml"
+_LEGACY_CONFIG_FILE_NAME = "flink-conf.yaml"
class RemoteAgentBuilder(AgentBuilder):
"""RemoteAgentBuilder for integrating datastream/table and agent."""
@@ -171,10 +174,7 @@ class
RemoteExecutionEnvironment(AgentsExecutionEnvironment):
self.__env = env
self.__t_env = t_env
self.__config = AgentConfiguration()
- flink_conf_dir = os.environ.get("FLINK_CONF_DIR")
- if flink_conf_dir is not None:
- config_dir = Path(flink_conf_dir) / "config.yaml"
- self.__config.load_from_file(str(config_dir))
+ self.__load_config_from_flink_conf_dir()
@property
def t_env(self) -> StreamTableEnvironment:
@@ -269,6 +269,49 @@ class
RemoteExecutionEnvironment(AgentsExecutionEnvironment):
self.__env.execute()
+ def __load_config_from_flink_conf_dir(self) -> None:
+ """Load agent configuration from FLINK_CONF_DIR if available."""
+ flink_conf_dir = os.environ.get("FLINK_CONF_DIR")
+ if flink_conf_dir is None:
+ return
+
+ # Try to find config file, with fallback to legacy name
+ config_path = self.__find_config_file(flink_conf_dir)
+
+ if config_path is None:
+ logging.error(f"Config file not found in {flink_conf_dir}")
+ else:
+ self.__config.load_from_file(str(config_path))
+
+ def __find_config_file(self, flink_conf_dir: str) -> Path | None:
+ """Find config file in the given directory, checking both new and
legacy names.
+
+ Parameters
+ ----------
+ flink_conf_dir : str
+ Directory to search for config files.
+
+ Returns:
+ -------
+ Path | None
+ Path to the config file if found, None otherwise.
+ """
+ # Try legacy config file name first
+ legacy_config_path =
Path(flink_conf_dir).joinpath(_LEGACY_CONFIG_FILE_NAME)
+ if legacy_config_path.exists():
+ logging.warning(
+ f"Using legacy config file {_LEGACY_CONFIG_FILE_NAME}"
+ )
+ return legacy_config_path
+
+ # Try new config file name as fallback
+ primary_config_path = Path(flink_conf_dir).joinpath(_CONFIG_FILE_NAME)
+ if primary_config_path.exists():
+ return primary_config_path
+
+ return None
+
+
def create_instance(
env: StreamExecutionEnvironment, t_env: StreamTableEnvironment, **kwargs:
Any
) -> AgentsExecutionEnvironment:
diff --git
a/python/flink_agents/runtime/tests/test_remote_execution_environment.py
b/python/flink_agents/runtime/tests/test_remote_execution_environment.py
new file mode 100644
index 0000000..7cdef48
--- /dev/null
+++ b/python/flink_agents/runtime/tests/test_remote_execution_environment.py
@@ -0,0 +1,181 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import os
+import tempfile
+from pathlib import Path
+from unittest.mock import MagicMock, patch
+
+import yaml
+
+from flink_agents.plan.configuration import AgentConfiguration
+from flink_agents.runtime.remote_execution_environment import (
+ RemoteExecutionEnvironment,
+)
+
+test_data = {
+ "agent": {
+ "database": {
+ "host": "localhost",
+ "port": 5432,
+ },
+ "api": {"endpoint": "/api/v1", "timeout": 30.0},
+ "debug": True,
+ }
+}
+
+
+def test_remote_execution_environment_load_config_file() -> None:
+ """Test RemoteExecutionEnvironment loads config from config.yaml."""
+ # Create a temporary directory with config.yaml
+ with tempfile.TemporaryDirectory() as temp_dir:
+ config_file = Path(temp_dir) / "config.yaml"
+ with config_file.open("w") as f:
+ yaml.dump(test_data, f)
+
+ # Set FLINK_CONF_DIR environment variable
+ original_env = os.environ.get("FLINK_CONF_DIR")
+ try:
+ os.environ["FLINK_CONF_DIR"] = temp_dir
+
+ # Mock StreamExecutionEnvironment
+ mock_stream_env = MagicMock()
+
+ # Create RemoteExecutionEnvironment instance
+ with patch(
+
"flink_agents.runtime.remote_execution_environment.StreamExecutionEnvironment"
+ ):
+ remote_env = RemoteExecutionEnvironment(env=mock_stream_env)
+
+ # Verify that configuration was loaded correctly
+ _verify_config(remote_env.get_config())
+
+ finally:
+ # Restore original environment variable
+ if original_env is None:
+ os.environ.pop("FLINK_CONF_DIR", None)
+ else:
+ os.environ["FLINK_CONF_DIR"] = original_env
+
+
+def test_remote_execution_environment_load_legacy_config_file() -> None:
+ """Test RemoteExecutionEnvironment loads legacy flink-conf.yaml."""
+ # Create a temporary directory with flink-conf.yaml (legacy name)
+ with tempfile.TemporaryDirectory() as temp_dir:
+ legacy_config_file = Path(temp_dir) / "flink-conf.yaml"
+ with legacy_config_file.open("w") as f:
+ yaml.dump(test_data, f)
+
+ # Set FLINK_CONF_DIR environment variable
+ original_env = os.environ.get("FLINK_CONF_DIR")
+ try:
+ os.environ["FLINK_CONF_DIR"] = temp_dir
+
+ # Mock StreamExecutionEnvironment and capture logging
+ mock_stream_env = MagicMock()
+
+ # Capture warning log about using legacy config file
+ with patch(
+
"flink_agents.runtime.remote_execution_environment.StreamExecutionEnvironment"
+ ):
+ with patch(
+ "flink_agents.runtime.remote_execution_environment.logging"
+ ) as mock_logging:
+ # Create RemoteExecutionEnvironment instance
+ remote_env =
RemoteExecutionEnvironment(env=mock_stream_env)
+
+ # Verify that a warning was logged about using legacy
config
+ assert mock_logging.warning.called
+ warning_call_args = mock_logging.warning.call_args[0][0]
+ assert "legacy config file" in warning_call_args.lower()
+ assert "flink-conf.yaml" in warning_call_args
+
+ # Verify that configuration was loaded correctly
+ config = remote_env.get_config()
+ _verify_config(config)
+
+ finally:
+ # Restore original environment variable
+ if original_env is None:
+ os.environ.pop("FLINK_CONF_DIR", None)
+ else:
+ os.environ["FLINK_CONF_DIR"] = original_env
+
+
+def test_remote_execution_environment_prioritizes_legacy_config() -> None:
+ """Test RemoteExecutionEnvironment prioritizes flink-conf.yaml over
config.yaml."""
+ # Create a temporary directory with both config files
+ with tempfile.TemporaryDirectory() as temp_dir:
+ # Create config.yaml with one set of values
+ config_file = Path(temp_dir) / "config.yaml"
+ config_data = {
+ "agent": {
+ "database": {
+ "host": "config-host",
+ "port": 9999,
+ },
+ }
+ }
+ with config_file.open("w") as f:
+ yaml.dump(config_data, f)
+
+ # Create flink-conf.yaml with different values
+ legacy_config_file = Path(temp_dir) / "flink-conf.yaml"
+ legacy_data = {
+ "agent": {
+ "database": {
+ "host": "legacy-host",
+ "port": 1234,
+ },
+ }
+ }
+ with legacy_config_file.open("w") as f:
+ yaml.dump(legacy_data, f)
+
+ # Set FLINK_CONF_DIR environment variable
+ original_env = os.environ.get("FLINK_CONF_DIR")
+ try:
+ os.environ["FLINK_CONF_DIR"] = temp_dir
+
+ # Mock StreamExecutionEnvironment
+ mock_stream_env = MagicMock()
+
+ # Create RemoteExecutionEnvironment instance
+ with patch(
+
"flink_agents.runtime.remote_execution_environment.StreamExecutionEnvironment"
+ ):
+ remote_env = RemoteExecutionEnvironment(env=mock_stream_env)
+
+ # Verify that configuration was loaded from flink-conf.yaml
(legacy)
+ config = remote_env.get_config()
+ assert config.get_str("database.host") == "legacy-host"
+ assert config.get_int("database.port") == 1234
+
+ finally:
+ # Restore original environment variable
+ if original_env is None:
+ os.environ.pop("FLINK_CONF_DIR", None)
+ else:
+ os.environ["FLINK_CONF_DIR"] = original_env
+
+
+def _verify_config(config: AgentConfiguration) -> None:
+ assert config.get_str("database.host") == "localhost"
+ assert config.get_int("database.port") == 5432
+ assert config.get_str("api.endpoint") == "/api/v1"
+ assert config.get_float("api.timeout") == 30.0
+ assert config.get_bool("debug") is True