This is an automated email from the ASF dual-hosted git repository.
wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new ed04111a [api][test] Simplify cross-language Java action declaration
and add e2e coverage (#827)
ed04111a is described below
commit ed04111af97970ee2d9d43094fc84f6390fe0423
Author: Wenjin Xie <[email protected]>
AuthorDate: Wed Jun 10 14:44:49 2026 +0800
[api][test] Simplify cross-language Java action declaration and add e2e
coverage (#827)
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
docs/content/docs/development/workflow_agent.md | 11 +-
.../test/YamlActionsInPythonCrossLanguageTest.java | 113 ++++++++++++++++
.../yaml_cross_language_actions_in_python.yaml | 47 +++++++
python/flink_agents/api/function.py | 24 +++-
.../api/tests/test_agent_add_action.py | 18 +--
python/flink_agents/api/tests/test_decorators.py | 9 +-
python/flink_agents/api/yaml/loader.py | 16 +--
.../python_agent_with_java_action.py | 10 +-
.../yaml_actions_in_java_cross_language_test.py | 148 +++++++++++++++++++++
.../yaml_java_action_cross_language_test.py | 107 +++++++++++++++
.../yaml_cross_language_actions_in_java.yaml | 49 +++++++
.../resources/yaml_cross_language_java_action.yaml | 15 +++
python/flink_agents/plan/tests/test_agent_plan.py | 9 +-
.../plan/tests/test_agent_plan_cross_language.py | 9 +-
.../tests/test_local_runner_cross_language.py | 9 +-
15 files changed, 527 insertions(+), 67 deletions(-)
diff --git a/docs/content/docs/development/workflow_agent.md
b/docs/content/docs/development/workflow_agent.md
index d15b6e92..b8b7e860 100644
--- a/docs/content/docs/development/workflow_agent.md
+++ b/docs/content/docs/development/workflow_agent.md
@@ -543,14 +543,9 @@ from flink_agents.api.function import JavaFunction
class MyAgent(Agent):
@action(
InputEvent.EVENT_TYPE,
- target=JavaFunction(
- qualname="com.example.MyHandlers",
- method_name="handleInput",
- parameter_types=[
- "org.apache.flink.agents.api.Event",
- "org.apache.flink.agents.api.context.RunnerContext",
- ],
- ),
+ # Action signatures are fixed (Event, RunnerContext), so for_action
+ # fills the Java parameter types for you — only the class and method.
+ target=JavaFunction.for_action("com.example.MyHandlers",
"handleInput"),
)
@staticmethod
def handle_input(event: Event, ctx: RunnerContext) -> None:
diff --git
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/YamlActionsInPythonCrossLanguageTest.java
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/YamlActionsInPythonCrossLanguageTest.java
new file mode 100644
index 00000000..44412980
--- /dev/null
+++
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/YamlActionsInPythonCrossLanguageTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.resource.test;
+
+import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static
org.apache.flink.agents.resource.test.ChatModelCrossLanguageAgent.OLLAMA_MODEL;
+import static
org.apache.flink.agents.resource.test.CrossLanguageTestPreparationUtils.pullModel;
+
+/**
+ * End-to-end test for the Java YAML loader with cross-language orchestration
actions: a Java host
+ * loads an agent whose user actions {@code process_input} / {@code
process_chat_response} are
+ * {@code type: python}, while the Java-native built-in chat loop bridges
ChatRequest→Response.
+ * Companion to {@link YamlCrossLanguageTest} (Java actions + Python tool) —
here the actions cross
+ * languages and the built-ins stay native. The math path still drives a
cross-language Python tool
+ * ({@code calculate_bmi}).
+ */
+public class YamlActionsInPythonCrossLanguageTest {
+
+ private static final Logger LOG =
+
LoggerFactory.getLogger(YamlActionsInPythonCrossLanguageTest.class);
+
+ private final boolean ollamaReady;
+
+ public YamlActionsInPythonCrossLanguageTest() throws IOException {
+ ollamaReady = pullModel(OLLAMA_MODEL);
+ }
+
+ @Test
+ public void testYamlPythonActionsOnJavaHost() throws Exception {
+ Assumptions.assumeTrue(ollamaReady, "Ollama Server information is not
provided");
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataStream<String> inputStream =
+ env.fromData(
+ "Calculate BMI for someone who is 1.75 meters tall and
weighs 70 kg",
+ "Tell me a joke about cats.");
+
+ AgentsExecutionEnvironment agentsEnv =
+ AgentsExecutionEnvironment.getExecutionEnvironment(env);
+
agentsEnv.loadYaml(yamlFixture("yaml_cross_language_actions_in_python.yaml"));
+
+ DataStream<Object> outputStream =
+ agentsEnv
+ .fromDataStream(
+ inputStream, (KeySelector<String, String>)
value -> "orderKey")
+ .apply("yaml_actions_in_python_agent")
+ .toDataStream();
+
+ CloseableIterator<Object> results = outputStream.collectAsync();
+ agentsEnv.execute();
+
+ List<String> responses = new ArrayList<>();
+ while (results.hasNext()) {
+ responses.add(String.valueOf(results.next()));
+ }
+ LOG.info("Python-action cross-language YAML agent responses: {}",
responses);
+
+ Assertions.assertEquals(
+ 2, responses.size(), "expected 2 responses, got " +
responses.size());
+
+ String joined = String.join("\n", responses).toLowerCase();
+ Assertions.assertTrue(
+ joined.contains("22"), String.format("math answer missing
'22': %s", responses));
+ Assertions.assertTrue(
+ joined.contains("cat"),
+ String.format("creative answer missing 'cat': %s", responses));
+ }
+
+ private static Path yamlFixture(String name) {
+ URL resource =
+ YamlActionsInPythonCrossLanguageTest.class
+ .getClassLoader()
+ .getResource("yaml/" + name);
+ Objects.requireNonNull(resource, "fixture not found on classpath:
yaml/" + name);
+ return Paths.get(resource.getPath());
+ }
+}
diff --git
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/resources/yaml/yaml_cross_language_actions_in_python.yaml
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/resources/yaml/yaml_cross_language_actions_in_python.yaml
new file mode 100644
index 00000000..61548980
--- /dev/null
+++
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/resources/yaml/yaml_cross_language_actions_in_python.yaml
@@ -0,0 +1,47 @@
+agents:
+ - name: yaml_actions_in_python_agent
+ description: |
+ Java-host cross-language YAML e2e agent whose orchestration actions are
+ Python. Complements yaml_cross_language_agent (Java actions + Python
tool):
+ here ``process_input`` / ``process_chat_response`` are ``type: python``
and
+ dispatch to ``yaml_cross_language_actions``, while the Java-native
built-in
+ chat loop bridges ChatRequest→Response. The math path still exercises a
+ cross-language Python tool (calculate_bmi).
+
+ chat_model_connections:
+ - name: ollama_connection_java
+ clazz: ollama
+ type: java
+ endpoint: http://localhost:11434
+ requestTimeout: 240
+ - name: ollama_connection_python
+ clazz: ollama
+ request_timeout: 240
+
+ chat_model_setups:
+ - name: math_chat_model
+ clazz: ollama
+ type: java
+ connection: ollama_connection_java
+ model: qwen3:1.7b
+ tools: [calculate_bmi]
+ extract_reasoning: true
+ - name: creative_chat_model
+ clazz: ollama
+ connection: ollama_connection_python
+ model: qwen3:1.7b
+ extract_reasoning: true
+
+ tools:
+ - name: calculate_bmi
+ function:
flink_agents.e2e_tests.e2e_tests_resource_cross_language.yaml_cross_language_actions:calculate_bmi
+
+ actions:
+ - name: process_input
+ type: python
+ function:
flink_agents.e2e_tests.e2e_tests_resource_cross_language.yaml_cross_language_actions:process_input
+ listen_to: [input]
+ - name: process_chat_response
+ type: python
+ function:
flink_agents.e2e_tests.e2e_tests_resource_cross_language.yaml_cross_language_actions:process_chat_response
+ listen_to: [chat_response]
diff --git a/python/flink_agents/api/function.py
b/python/flink_agents/api/function.py
index b5597664..d95375a4 100644
--- a/python/flink_agents/api/function.py
+++ b/python/flink_agents/api/function.py
@@ -25,10 +25,18 @@ method name, and parameter types for Java.
import importlib
import inspect
from abc import ABC
-from typing import Any, Callable, List
+from typing import Any, Callable, List, Tuple
from pydantic import BaseModel, model_serializer
+#: Java parameter types of an action method. Action signatures are fixed
+#: ``(Event, RunnerContext)``, so callers never have to spell them out. A tuple
+#: keeps the shared constant immutable; callers copy it into their own list.
+ACTION_PARAMETER_TYPES: Tuple[str, ...] = (
+ "org.apache.flink.agents.api.Event",
+ "org.apache.flink.agents.api.context.RunnerContext",
+)
+
class Function(BaseModel, ABC):
"""Marker base class for function descriptors. Pure data — has no
@@ -99,6 +107,20 @@ class JavaFunction(Function):
method_name: str
parameter_types: List[str]
+ @classmethod
+ def for_action(cls, qualname: str, method_name: str) -> "JavaFunction":
+ """Build a descriptor for a Java action, filling the fixed signature.
+
+ Actions always take ``(Event, RunnerContext)``, so ``parameter_types``
+ is implied — mirrors the YAML API, which omits it for ``type: java``
+ actions. Tools must still pass ``parameter_types`` to the constructor.
+ """
+ return cls(
+ qualname=qualname,
+ method_name=method_name,
+ parameter_types=list(ACTION_PARAMETER_TYPES),
+ )
+
@model_serializer
def __serialize(self) -> dict[str, Any]:
return {
diff --git a/python/flink_agents/api/tests/test_agent_add_action.py
b/python/flink_agents/api/tests/test_agent_add_action.py
index 98b448b0..ee7e59f2 100644
--- a/python/flink_agents/api/tests/test_agent_add_action.py
+++ b/python/flink_agents/api/tests/test_agent_add_action.py
@@ -30,14 +30,16 @@ def _dummy_action(event: Event, ctx: RunnerContext) -> None:
def _make_java_function() -> JavaFunction:
- return JavaFunction(
- qualname="com.example.Handlers",
- method_name="handle",
- parameter_types=[
- "org.apache.flink.agents.api.Event",
- "org.apache.flink.agents.api.context.RunnerContext",
- ],
- )
+ return JavaFunction.for_action("com.example.Handlers", "handle")
+
+
+def test_java_function_for_action_fills_fixed_action_signature() -> None:
+ """``for_action`` omits parameter_types boilerplate; YAML already does
this."""
+ jf = JavaFunction.for_action("com.example.Handlers", "handle")
+ assert jf.parameter_types == [
+ "org.apache.flink.agents.api.Event",
+ "org.apache.flink.agents.api.context.RunnerContext",
+ ]
# ── Descriptor pass-through ─────────────────────────────────────────────
diff --git a/python/flink_agents/api/tests/test_decorators.py
b/python/flink_agents/api/tests/test_decorators.py
index 6a4b131f..94ff8d86 100644
--- a/python/flink_agents/api/tests/test_decorators.py
+++ b/python/flink_agents/api/tests/test_decorators.py
@@ -94,14 +94,7 @@ def test_action_decorator_rejects_invalid_types() -> None:
def _java_target() -> JavaFunction:
- return JavaFunction(
- qualname="com.example.Handlers",
- method_name="handle",
- parameter_types=[
- "org.apache.flink.agents.api.Event",
- "org.apache.flink.agents.api.context.RunnerContext",
- ],
- )
+ return JavaFunction.for_action("com.example.Handlers", "handle")
def test_action_decorator_with_cross_language_target() -> None:
diff --git a/python/flink_agents/api/yaml/loader.py
b/python/flink_agents/api/yaml/loader.py
index 2a0ad28d..54e5938e 100644
--- a/python/flink_agents/api/yaml/loader.py
+++ b/python/flink_agents/api/yaml/loader.py
@@ -29,7 +29,12 @@ import yaml
from flink_agents.api.agents.agent import Agent
from flink_agents.api.chat_message import ChatMessage, MessageRole
-from flink_agents.api.function import Function, JavaFunction, PythonFunction
+from flink_agents.api.function import (
+ ACTION_PARAMETER_TYPES,
+ Function,
+ JavaFunction,
+ PythonFunction,
+)
from flink_agents.api.prompts.prompt import Prompt
from flink_agents.api.resource import ResourceDescriptor, ResourceType
from flink_agents.api.skills import Skills, SkillSourceSpec
@@ -50,13 +55,6 @@ from flink_agents.api.yaml.specs import (
YamlAgentsDocument,
)
-# Default Java parameter types for an action. Action methods in
-# flink-agents always have signature (Event, RunnerContext).
-_JAVA_ACTION_PARAMETER_TYPES: list[str] = [
- "org.apache.flink.agents.api.Event",
- "org.apache.flink.agents.api.context.RunnerContext",
-]
-
_DESCRIPTOR_TYPES: Dict[str, ResourceType] = {
"chat_model_connections": ResourceType.CHAT_MODEL_CONNECTION,
"chat_model_setups": ResourceType.CHAT_MODEL,
@@ -159,7 +157,7 @@ def _add_descriptors_to_agent(
def _resolve_action_function(action: ActionSpec) -> Function:
- parameter_types = _JAVA_ACTION_PARAMETER_TYPES if action.type == "java"
else None
+ parameter_types = ACTION_PARAMETER_TYPES if action.type == "java" else None
return resolve_function(
name=action.name,
function=action.function,
diff --git
a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/python_agent_with_java_action.py
b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/python_agent_with_java_action.py
index a59b46fe..f7fb2160 100644
---
a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/python_agent_with_java_action.py
+++
b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/python_agent_with_java_action.py
@@ -25,10 +25,6 @@ from flink_agents.api.function import JavaFunction
JAVA_HANDLER_QUALNAME =
"org.apache.flink.agents.resource.test.JavaActionHandler"
JAVA_HANDLER_METHOD = "multiplyByTwo"
-JAVA_HANDLER_PARAMETER_TYPES = [
- "org.apache.flink.agents.api.Event",
- "org.apache.flink.agents.api.context.RunnerContext",
-]
class PythonAgentWithJavaActionAgent(Agent):
@@ -40,11 +36,7 @@ class PythonAgentWithJavaActionAgent(Agent):
self.add_action(
name="multiply_by_two",
events=[InputEvent.EVENT_TYPE],
- func=JavaFunction(
- qualname=JAVA_HANDLER_QUALNAME,
- method_name=JAVA_HANDLER_METHOD,
- parameter_types=JAVA_HANDLER_PARAMETER_TYPES,
- ),
+ func=JavaFunction.for_action(JAVA_HANDLER_QUALNAME,
JAVA_HANDLER_METHOD),
)
diff --git
a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_actions_in_java_cross_language_test.py
b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_actions_in_java_cross_language_test.py
new file mode 100644
index 00000000..770d8505
--- /dev/null
+++
b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_actions_in_java_cross_language_test.py
@@ -0,0 +1,148 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+"""E2E test: Python-host agent whose orchestration actions are Java.
+
+Companion to ``yaml_cross_language_test.py`` (Python actions + Java tool).
+Here ``process_input`` / ``process_chat_response`` are ``type: java`` and
+dispatch to ``YamlCrossLanguageActions``; the Python-native built-in
+``chat_model_action`` bridges the chat loop, and the math path calls the
+cross-language Java ``calculateBMI`` tool. Exercises a Java user action →
+Python built-in → Java user action round trip.
+
+Skipped when the Ollama client/model or the cross-language test-jar is
+unavailable.
+"""
+
+import os
+import sysconfig
+from pathlib import Path
+
+import pytest
+from pyflink.common import Configuration, Encoder, WatermarkStrategy
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import RuntimeExecutionMode, StreamExecutionEnvironment
+from pyflink.datastream.connectors.file_system import (
+ FileSource,
+ StreamFormat,
+ StreamingFileSink,
+)
+
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from flink_agents.e2e_tests.test_utils import (
+ assert_tool_invoked,
+ collect_tool_invocations,
+ pull_model,
+)
+
+current_dir = Path(__file__).parent
+_RESOURCES = current_dir.parent / "resources"
+_REPO_ROOT = current_dir.parent.parent.parent.parent
+_TEST_JAR = (
+ _REPO_ROOT
+ / "e2e-test"
+ / "flink-agents-end-to-end-tests-resource-cross-language"
+ / "target"
+ /
"flink-agents-end-to-end-tests-resource-cross-language-0.3-SNAPSHOT-tests.jar"
+)
+
+os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"]
+
+OLLAMA_MODEL = os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:1.7b")
+_client = pull_model(OLLAMA_MODEL)
+
+
[email protected](
+ _client is None,
+ reason="Ollama client is not available or test model is missing.",
+)
[email protected](
+ not _TEST_JAR.is_file(),
+ reason=(
+ "Cross-language test-jar is missing; run "
+ "'mvn package -DskipTests -pl e2e-test/"
+ "flink-agents-end-to-end-tests-resource-cross-language' first."
+ ),
+)
+def test_yaml_python_host_with_java_actions(
+ tmp_path: Path, monkeypatch: pytest.MonkeyPatch
+) -> None:
+ """``load_yaml`` agent whose process_* actions are Java, bridged by
Python."""
+ monkeypatch.setenv("OLLAMA_CHAT_MODEL", OLLAMA_MODEL)
+ config = Configuration()
+ config.set_string("python.pythonpath", sysconfig.get_paths()["purelib"])
+ env = StreamExecutionEnvironment.get_execution_environment(config)
+ env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+ env.set_parallelism(1)
+ env.add_jars(f"file://{_TEST_JAR}")
+
+ input_datastream = env.from_source(
+ source=FileSource.for_record_stream_format(
+ StreamFormat.text_line_format(),
+ f"file:///{_RESOURCES}/yaml_cross_language_input",
+ ).build(),
+ watermark_strategy=WatermarkStrategy.no_watermarks(),
+ source_name="yaml_actions_in_java_source",
+ )
+ deserialize_datastream = input_datastream.map(lambda x: str(x))
+
+ agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+ log_dir = tmp_path / "event_logs"
+ log_dir.mkdir(parents=True, exist_ok=True)
+ agents_env.get_config().set_str("baseLogDir", str(log_dir))
+ agents_env.load_yaml(_RESOURCES /
"yaml_cross_language_actions_in_java.yaml")
+
+ output_datastream = (
+ agents_env.from_datastream(
+ input=deserialize_datastream, key_selector=lambda x: "orderKey"
+ )
+ .apply("yaml_actions_in_java_agent")
+ .to_datastream()
+ )
+
+ result_dir = tmp_path / "results"
+ result_dir.mkdir(parents=True, exist_ok=True)
+ output_datastream.map(
+ lambda x: str(x).replace("\n", "").replace("\r", ""), Types.STRING()
+ ).add_sink(
+ StreamingFileSink.for_row_format(
+ base_path=str(result_dir.absolute()),
+ encoder=Encoder.simple_string_encoder(),
+ ).build()
+ )
+
+ agents_env.execute()
+
+ actual_result = []
+ for file in result_dir.iterdir():
+ if file.is_dir():
+ for child in file.iterdir():
+ with child.open() as f:
+ actual_result.extend(f.readlines())
+ if file.is_file():
+ with file.open() as f:
+ actual_result.extend(f.readlines())
+
+ # Math path went through the Java calculateBMI tool — cross-language tool.
+ assert_tool_invoked(
+ collect_tool_invocations(log_dir),
+ "calculateBMI",
+ {"weightKg": 70, "heightM": 1.75},
+ )
+ # Creative path uses no tool; its answer mentions a cat.
+ joined = "\n".join(actual_result).lower()
+ assert "cat" in joined, f"creative answer missing 'cat': {actual_result!r}"
diff --git
a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_java_action_cross_language_test.py
b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_java_action_cross_language_test.py
new file mode 100644
index 00000000..37303c0f
--- /dev/null
+++
b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_java_action_cross_language_test.py
@@ -0,0 +1,107 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+"""E2E test: a YAML-declared agent whose action body is a Java method.
+
+Companion to ``yaml_cross_language_test.py`` (Java *tool*) — this covers
+the Java *action* path. Parses
``resources/yaml_cross_language_java_action.yaml``
+via ``AgentsExecutionEnvironment.load_yaml`` and runs the declared agent.
+The single ``type: java`` action dispatches into
+``org.apache.flink.agents.resource.test.JavaActionHandler.multiplyByTwo``,
+mirroring ``python_agent_with_java_action_test`` but driven by YAML. The
+YAML omits ``parameter_types`` (actions have a fixed signature), so this
+also guards the loader's auto-fill. No chat model is involved, so it runs
+whenever the cross-language test-jar is present.
+"""
+
+import os
+import sysconfig
+from pathlib import Path
+
+import pytest
+from pyflink.common import Configuration, Encoder
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors.file_system import StreamingFileSink
+
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+
+current_dir = Path(__file__).parent
+_RESOURCES = current_dir.parent / "resources"
+_REPO_ROOT = current_dir.parent.parent.parent.parent
+_TEST_JAR = (
+ _REPO_ROOT
+ / "e2e-test"
+ / "flink-agents-end-to-end-tests-resource-cross-language"
+ / "target"
+ /
"flink-agents-end-to-end-tests-resource-cross-language-0.3-SNAPSHOT-tests.jar"
+)
+
+os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"]
+
+
[email protected](
+ not _TEST_JAR.is_file(),
+ reason=(
+ "Cross-language test-jar is missing; run "
+ "'mvn package -DskipTests -pl e2e-test/"
+ "flink-agents-end-to-end-tests-resource-cross-language' first."
+ ),
+)
+def test_yaml_agent_dispatches_java_action_body(tmp_path: Path) -> None:
+ """``load_yaml`` → ``apply(by name)`` with a YAML-declared Java action."""
+ config = Configuration()
+ config.set_string("python.pythonpath", sysconfig.get_paths()["purelib"])
+ env = StreamExecutionEnvironment.get_execution_environment(config)
+ env.set_parallelism(1)
+ env.add_jars(f"file://{_TEST_JAR}")
+
+ input_stream = env.from_collection([1, 2, 3, 4, 5],
type_info=Types.LONG()).map(
+ lambda x: x
+ )
+
+ agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+ agents_env.load_yaml(_RESOURCES / "yaml_cross_language_java_action.yaml")
+ output_datastream = (
+ agents_env.from_datastream(input=input_stream, key_selector=lambda x:
0)
+ .apply("yaml_cross_language_java_action_agent")
+ .to_datastream(Types.LONG())
+ )
+
+ result_dir = tmp_path / "results"
+ result_dir.mkdir(parents=True, exist_ok=True)
+ output_datastream.map(lambda x: str(x), Types.STRING()).add_sink(
+ StreamingFileSink.for_row_format(
+ base_path=str(result_dir.absolute()),
+ encoder=Encoder.simple_string_encoder(),
+ ).build()
+ )
+
+ agents_env.execute()
+
+ actual: list[int] = []
+ for file in result_dir.iterdir():
+ if file.is_dir():
+ for child in file.iterdir():
+ with child.open() as f:
+ actual.extend(int(line.strip()) for line in f if
line.strip())
+ elif file.is_file():
+ with file.open() as f:
+ actual.extend(int(line.strip()) for line in f if line.strip())
+
+ actual.sort()
+ assert actual == [2, 4, 6, 8, 10], f"unexpected outputs: {actual}"
diff --git
a/python/flink_agents/e2e_tests/resources/yaml_cross_language_actions_in_java.yaml
b/python/flink_agents/e2e_tests/resources/yaml_cross_language_actions_in_java.yaml
new file mode 100644
index 00000000..b7acca47
--- /dev/null
+++
b/python/flink_agents/e2e_tests/resources/yaml_cross_language_actions_in_java.yaml
@@ -0,0 +1,49 @@
+agents:
+ - name: yaml_actions_in_java_agent
+ description: |
+ Python-host cross-language e2e agent whose orchestration actions are
+ Java. Complements yaml_cross_language_agent (Python actions + Java tool):
+ here the user actions ``process_input`` / ``process_chat_response`` are
+ ``type: java`` and dispatch to ``YamlCrossLanguageActions``, while the
+ Python-native built-in ``chat_model_action`` bridges
ChatRequest→Response.
+ The math path also exercises a cross-language Java tool (calculateBMI).
+
+ chat_model_connections:
+ - name: ollama_connection
+ clazz: ollama
+ request_timeout: 240.0
+ - name: ollama_connection_java
+ clazz: ollama
+ type: java
+ endpoint: http://localhost:11434
+ requestTimeout: 240
+
+ chat_model_setups:
+ - name: math_chat_model
+ clazz: ollama
+ connection: ollama_connection
+ model: qwen3:1.7b
+ tools: [calculateBMI]
+ extract_reasoning: true
+ - name: creative_chat_model
+ clazz: ollama
+ type: java
+ connection: ollama_connection_java
+ model: qwen3:1.7b
+ extract_reasoning: true
+
+ tools:
+ - name: calculateBMI
+ type: java
+ function:
org.apache.flink.agents.resource.test.ChatModelCrossLanguageAgent:calculateBMI
+ parameter_types: [java.lang.Double, java.lang.Double]
+
+ actions:
+ - name: process_input
+ type: java
+ function:
org.apache.flink.agents.resource.test.YamlCrossLanguageActions:processInput
+ listen_to: [input]
+ - name: process_chat_response
+ type: java
+ function:
org.apache.flink.agents.resource.test.YamlCrossLanguageActions:processChatResponse
+ listen_to: [chat_response]
diff --git
a/python/flink_agents/e2e_tests/resources/yaml_cross_language_java_action.yaml
b/python/flink_agents/e2e_tests/resources/yaml_cross_language_java_action.yaml
new file mode 100644
index 00000000..d6c9029b
--- /dev/null
+++
b/python/flink_agents/e2e_tests/resources/yaml_cross_language_java_action.yaml
@@ -0,0 +1,15 @@
+agents:
+ - name: yaml_cross_language_java_action_agent
+ description: |
+ YAML-driven cross-language e2e agent whose action body is a Java
+ static method. Exercises the Python→Java action bridge: the agent
+ declares a single ``type: java`` action that dispatches into
+ ``JavaActionHandler.multiplyByTwo``. No ``parameter_types`` is
+ written — action signatures are fixed (Event, RunnerContext), so the
+ loader fills them in.
+
+ actions:
+ - name: multiply_by_two
+ type: java
+ function:
org.apache.flink.agents.resource.test.JavaActionHandler:multiplyByTwo
+ listen_to: [input]
diff --git a/python/flink_agents/plan/tests/test_agent_plan.py
b/python/flink_agents/plan/tests/test_agent_plan.py
index 801223e2..9d68f8c1 100644
--- a/python/flink_agents/plan/tests/test_agent_plan.py
+++ b/python/flink_agents/plan/tests/test_agent_plan.py
@@ -149,14 +149,7 @@ _JAVA_HANDLER_QUALNAME = (
class AgentWithCrossLanguageDecoratedAction(Agent):
@action(
InputEvent.EVENT_TYPE,
- target=JavaFunction(
- qualname=_JAVA_HANDLER_QUALNAME,
- method_name="handleInput",
- parameter_types=[
- "org.apache.flink.agents.api.Event",
- "org.apache.flink.agents.api.context.RunnerContext",
- ],
- ),
+ target=JavaFunction.for_action(_JAVA_HANDLER_QUALNAME, "handleInput"),
)
@staticmethod
def handle(event: Event, ctx: RunnerContext) -> None:
diff --git a/python/flink_agents/plan/tests/test_agent_plan_cross_language.py
b/python/flink_agents/plan/tests/test_agent_plan_cross_language.py
index 9a4b6d34..fbf46711 100644
--- a/python/flink_agents/plan/tests/test_agent_plan_cross_language.py
+++ b/python/flink_agents/plan/tests/test_agent_plan_cross_language.py
@@ -60,14 +60,7 @@ def _dummy_action(event: Event, ctx: RunnerContext) -> None:
def _make_java_function_descriptor() -> ApiJavaFunction:
- return ApiJavaFunction(
- qualname="com.example.Handlers",
- method_name="handle",
- parameter_types=[
- "org.apache.flink.agents.api.Event",
- "org.apache.flink.agents.api.context.RunnerContext",
- ],
- )
+ return ApiJavaFunction.for_action("com.example.Handlers", "handle")
def _make_python_function_descriptor() -> ApiPythonFunction:
diff --git
a/python/flink_agents/runtime/tests/test_local_runner_cross_language.py
b/python/flink_agents/runtime/tests/test_local_runner_cross_language.py
index 0bb1502e..7075fc71 100644
--- a/python/flink_agents/runtime/tests/test_local_runner_cross_language.py
+++ b/python/flink_agents/runtime/tests/test_local_runner_cross_language.py
@@ -36,14 +36,7 @@ def echo_action(event: Event, ctx: RunnerContext) -> None:
def _make_java_function_descriptor() -> ApiJavaFunction:
- return ApiJavaFunction(
- qualname="com.example.Handlers",
- method_name="handle",
- parameter_types=[
- "org.apache.flink.agents.api.Event",
- "org.apache.flink.agents.api.context.RunnerContext",
- ],
- )
+ return ApiJavaFunction.for_action("com.example.Handlers", "handle")
def test_local_runner_dispatches_python_function_action() -> None: