This is an automated email from the ASF dual-hosted git repository.

wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new ed04111a [api][test] Simplify cross-language Java action declaration 
and add e2e coverage (#827)
ed04111a is described below

commit ed04111af97970ee2d9d43094fc84f6390fe0423
Author: Wenjin Xie <[email protected]>
AuthorDate: Wed Jun 10 14:44:49 2026 +0800

    [api][test] Simplify cross-language Java action declaration and add e2e 
coverage (#827)
    
    Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
 docs/content/docs/development/workflow_agent.md    |  11 +-
 .../test/YamlActionsInPythonCrossLanguageTest.java | 113 ++++++++++++++++
 .../yaml_cross_language_actions_in_python.yaml     |  47 +++++++
 python/flink_agents/api/function.py                |  24 +++-
 .../api/tests/test_agent_add_action.py             |  18 +--
 python/flink_agents/api/tests/test_decorators.py   |   9 +-
 python/flink_agents/api/yaml/loader.py             |  16 +--
 .../python_agent_with_java_action.py               |  10 +-
 .../yaml_actions_in_java_cross_language_test.py    | 148 +++++++++++++++++++++
 .../yaml_java_action_cross_language_test.py        | 107 +++++++++++++++
 .../yaml_cross_language_actions_in_java.yaml       |  49 +++++++
 .../resources/yaml_cross_language_java_action.yaml |  15 +++
 python/flink_agents/plan/tests/test_agent_plan.py  |   9 +-
 .../plan/tests/test_agent_plan_cross_language.py   |   9 +-
 .../tests/test_local_runner_cross_language.py      |   9 +-
 15 files changed, 527 insertions(+), 67 deletions(-)

diff --git a/docs/content/docs/development/workflow_agent.md 
b/docs/content/docs/development/workflow_agent.md
index d15b6e92..b8b7e860 100644
--- a/docs/content/docs/development/workflow_agent.md
+++ b/docs/content/docs/development/workflow_agent.md
@@ -543,14 +543,9 @@ from flink_agents.api.function import JavaFunction
 class MyAgent(Agent):
     @action(
         InputEvent.EVENT_TYPE,
-        target=JavaFunction(
-            qualname="com.example.MyHandlers",
-            method_name="handleInput",
-            parameter_types=[
-                "org.apache.flink.agents.api.Event",
-                "org.apache.flink.agents.api.context.RunnerContext",
-            ],
-        ),
+        # Action signatures are fixed (Event, RunnerContext), so for_action
+        # fills the Java parameter types for you — only the class and method.
+        target=JavaFunction.for_action("com.example.MyHandlers", 
"handleInput"),
     )
     @staticmethod
     def handle_input(event: Event, ctx: RunnerContext) -> None:
diff --git 
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/YamlActionsInPythonCrossLanguageTest.java
 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/YamlActionsInPythonCrossLanguageTest.java
new file mode 100644
index 00000000..44412980
--- /dev/null
+++ 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/YamlActionsInPythonCrossLanguageTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.resource.test;
+
+import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.flink.agents.resource.test.ChatModelCrossLanguageAgent.OLLAMA_MODEL;
+import static 
org.apache.flink.agents.resource.test.CrossLanguageTestPreparationUtils.pullModel;
+
+/**
+ * End-to-end test for the Java YAML loader with cross-language orchestration 
actions: a Java host
+ * loads an agent whose user actions {@code process_input} / {@code 
process_chat_response} are
+ * {@code type: python}, while the Java-native built-in chat loop bridges 
ChatRequest→Response.
+ * Companion to {@link YamlCrossLanguageTest} (Java actions + Python tool) — 
here the actions cross
+ * languages and the built-ins stay native. The math path still drives a 
cross-language Python tool
+ * ({@code calculate_bmi}).
+ */
+public class YamlActionsInPythonCrossLanguageTest {
+
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(YamlActionsInPythonCrossLanguageTest.class);
+
+    private final boolean ollamaReady;
+
+    public YamlActionsInPythonCrossLanguageTest() throws IOException {
+        ollamaReady = pullModel(OLLAMA_MODEL);
+    }
+
+    @Test
+    public void testYamlPythonActionsOnJavaHost() throws Exception {
+        Assumptions.assumeTrue(ollamaReady, "Ollama Server information is not 
provided");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        DataStream<String> inputStream =
+                env.fromData(
+                        "Calculate BMI for someone who is 1.75 meters tall and 
weighs 70 kg",
+                        "Tell me a joke about cats.");
+
+        AgentsExecutionEnvironment agentsEnv =
+                AgentsExecutionEnvironment.getExecutionEnvironment(env);
+        
agentsEnv.loadYaml(yamlFixture("yaml_cross_language_actions_in_python.yaml"));
+
+        DataStream<Object> outputStream =
+                agentsEnv
+                        .fromDataStream(
+                                inputStream, (KeySelector<String, String>) 
value -> "orderKey")
+                        .apply("yaml_actions_in_python_agent")
+                        .toDataStream();
+
+        CloseableIterator<Object> results = outputStream.collectAsync();
+        agentsEnv.execute();
+
+        List<String> responses = new ArrayList<>();
+        while (results.hasNext()) {
+            responses.add(String.valueOf(results.next()));
+        }
+        LOG.info("Python-action cross-language YAML agent responses: {}", 
responses);
+
+        Assertions.assertEquals(
+                2, responses.size(), "expected 2 responses, got " + 
responses.size());
+
+        String joined = String.join("\n", responses).toLowerCase();
+        Assertions.assertTrue(
+                joined.contains("22"), String.format("math answer missing 
'22': %s", responses));
+        Assertions.assertTrue(
+                joined.contains("cat"),
+                String.format("creative answer missing 'cat': %s", responses));
+    }
+
+    private static Path yamlFixture(String name) {
+        URL resource =
+                YamlActionsInPythonCrossLanguageTest.class
+                        .getClassLoader()
+                        .getResource("yaml/" + name);
+        Objects.requireNonNull(resource, "fixture not found on classpath: 
yaml/" + name);
+        return Paths.get(resource.getPath());
+    }
+}
diff --git 
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/resources/yaml/yaml_cross_language_actions_in_python.yaml
 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/resources/yaml/yaml_cross_language_actions_in_python.yaml
new file mode 100644
index 00000000..61548980
--- /dev/null
+++ 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/resources/yaml/yaml_cross_language_actions_in_python.yaml
@@ -0,0 +1,47 @@
+agents:
+  - name: yaml_actions_in_python_agent
+    description: |
+      Java-host cross-language YAML e2e agent whose orchestration actions are
+      Python. Complements yaml_cross_language_agent (Java actions + Python 
tool):
+      here ``process_input`` / ``process_chat_response`` are ``type: python`` 
and
+      dispatch to ``yaml_cross_language_actions``, while the Java-native 
built-in
+      chat loop bridges ChatRequest→Response. The math path still exercises a
+      cross-language Python tool (calculate_bmi).
+
+    chat_model_connections:
+      - name: ollama_connection_java
+        clazz: ollama
+        type: java
+        endpoint: http://localhost:11434
+        requestTimeout: 240
+      - name: ollama_connection_python
+        clazz: ollama
+        request_timeout: 240
+
+    chat_model_setups:
+      - name: math_chat_model
+        clazz: ollama
+        type: java
+        connection: ollama_connection_java
+        model: qwen3:1.7b
+        tools: [calculate_bmi]
+        extract_reasoning: true
+      - name: creative_chat_model
+        clazz: ollama
+        connection: ollama_connection_python
+        model: qwen3:1.7b
+        extract_reasoning: true
+
+    tools:
+      - name: calculate_bmi
+        function: 
flink_agents.e2e_tests.e2e_tests_resource_cross_language.yaml_cross_language_actions:calculate_bmi
+
+    actions:
+      - name: process_input
+        type: python
+        function: 
flink_agents.e2e_tests.e2e_tests_resource_cross_language.yaml_cross_language_actions:process_input
+        listen_to: [input]
+      - name: process_chat_response
+        type: python
+        function: 
flink_agents.e2e_tests.e2e_tests_resource_cross_language.yaml_cross_language_actions:process_chat_response
+        listen_to: [chat_response]
diff --git a/python/flink_agents/api/function.py 
b/python/flink_agents/api/function.py
index b5597664..d95375a4 100644
--- a/python/flink_agents/api/function.py
+++ b/python/flink_agents/api/function.py
@@ -25,10 +25,18 @@ method name, and parameter types for Java.
 import importlib
 import inspect
 from abc import ABC
-from typing import Any, Callable, List
+from typing import Any, Callable, List, Tuple
 
 from pydantic import BaseModel, model_serializer
 
+#: Java parameter types of an action method. Action signatures are fixed
+#: ``(Event, RunnerContext)``, so callers never have to spell them out. A tuple
+#: keeps the shared constant immutable; callers copy it into their own list.
+ACTION_PARAMETER_TYPES: Tuple[str, ...] = (
+    "org.apache.flink.agents.api.Event",
+    "org.apache.flink.agents.api.context.RunnerContext",
+)
+
 
 class Function(BaseModel, ABC):
     """Marker base class for function descriptors. Pure data — has no
@@ -99,6 +107,20 @@ class JavaFunction(Function):
     method_name: str
     parameter_types: List[str]
 
+    @classmethod
+    def for_action(cls, qualname: str, method_name: str) -> "JavaFunction":
+        """Build a descriptor for a Java action, filling the fixed signature.
+
+        Actions always take ``(Event, RunnerContext)``, so ``parameter_types``
+        is implied — mirrors the YAML API, which omits it for ``type: java``
+        actions. Tools must still pass ``parameter_types`` to the constructor.
+        """
+        return cls(
+            qualname=qualname,
+            method_name=method_name,
+            parameter_types=list(ACTION_PARAMETER_TYPES),
+        )
+
     @model_serializer
     def __serialize(self) -> dict[str, Any]:
         return {
diff --git a/python/flink_agents/api/tests/test_agent_add_action.py 
b/python/flink_agents/api/tests/test_agent_add_action.py
index 98b448b0..ee7e59f2 100644
--- a/python/flink_agents/api/tests/test_agent_add_action.py
+++ b/python/flink_agents/api/tests/test_agent_add_action.py
@@ -30,14 +30,16 @@ def _dummy_action(event: Event, ctx: RunnerContext) -> None:
 
 
 def _make_java_function() -> JavaFunction:
-    return JavaFunction(
-        qualname="com.example.Handlers",
-        method_name="handle",
-        parameter_types=[
-            "org.apache.flink.agents.api.Event",
-            "org.apache.flink.agents.api.context.RunnerContext",
-        ],
-    )
+    return JavaFunction.for_action("com.example.Handlers", "handle")
+
+
+def test_java_function_for_action_fills_fixed_action_signature() -> None:
+    """``for_action`` omits parameter_types boilerplate; YAML already does 
this."""
+    jf = JavaFunction.for_action("com.example.Handlers", "handle")
+    assert jf.parameter_types == [
+        "org.apache.flink.agents.api.Event",
+        "org.apache.flink.agents.api.context.RunnerContext",
+    ]
 
 
 # ── Descriptor pass-through ─────────────────────────────────────────────
diff --git a/python/flink_agents/api/tests/test_decorators.py 
b/python/flink_agents/api/tests/test_decorators.py
index 6a4b131f..94ff8d86 100644
--- a/python/flink_agents/api/tests/test_decorators.py
+++ b/python/flink_agents/api/tests/test_decorators.py
@@ -94,14 +94,7 @@ def test_action_decorator_rejects_invalid_types() -> None:
 
 
 def _java_target() -> JavaFunction:
-    return JavaFunction(
-        qualname="com.example.Handlers",
-        method_name="handle",
-        parameter_types=[
-            "org.apache.flink.agents.api.Event",
-            "org.apache.flink.agents.api.context.RunnerContext",
-        ],
-    )
+    return JavaFunction.for_action("com.example.Handlers", "handle")
 
 
 def test_action_decorator_with_cross_language_target() -> None:
diff --git a/python/flink_agents/api/yaml/loader.py 
b/python/flink_agents/api/yaml/loader.py
index 2a0ad28d..54e5938e 100644
--- a/python/flink_agents/api/yaml/loader.py
+++ b/python/flink_agents/api/yaml/loader.py
@@ -29,7 +29,12 @@ import yaml
 
 from flink_agents.api.agents.agent import Agent
 from flink_agents.api.chat_message import ChatMessage, MessageRole
-from flink_agents.api.function import Function, JavaFunction, PythonFunction
+from flink_agents.api.function import (
+    ACTION_PARAMETER_TYPES,
+    Function,
+    JavaFunction,
+    PythonFunction,
+)
 from flink_agents.api.prompts.prompt import Prompt
 from flink_agents.api.resource import ResourceDescriptor, ResourceType
 from flink_agents.api.skills import Skills, SkillSourceSpec
@@ -50,13 +55,6 @@ from flink_agents.api.yaml.specs import (
     YamlAgentsDocument,
 )
 
-# Default Java parameter types for an action. Action methods in
-# flink-agents always have signature (Event, RunnerContext).
-_JAVA_ACTION_PARAMETER_TYPES: list[str] = [
-    "org.apache.flink.agents.api.Event",
-    "org.apache.flink.agents.api.context.RunnerContext",
-]
-
 _DESCRIPTOR_TYPES: Dict[str, ResourceType] = {
     "chat_model_connections": ResourceType.CHAT_MODEL_CONNECTION,
     "chat_model_setups": ResourceType.CHAT_MODEL,
@@ -159,7 +157,7 @@ def _add_descriptors_to_agent(
 
 
 def _resolve_action_function(action: ActionSpec) -> Function:
-    parameter_types = _JAVA_ACTION_PARAMETER_TYPES if action.type == "java" 
else None
+    parameter_types = ACTION_PARAMETER_TYPES if action.type == "java" else None
     return resolve_function(
         name=action.name,
         function=action.function,
diff --git 
a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/python_agent_with_java_action.py
 
b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/python_agent_with_java_action.py
index a59b46fe..f7fb2160 100644
--- 
a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/python_agent_with_java_action.py
+++ 
b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/python_agent_with_java_action.py
@@ -25,10 +25,6 @@ from flink_agents.api.function import JavaFunction
 
 JAVA_HANDLER_QUALNAME = 
"org.apache.flink.agents.resource.test.JavaActionHandler"
 JAVA_HANDLER_METHOD = "multiplyByTwo"
-JAVA_HANDLER_PARAMETER_TYPES = [
-    "org.apache.flink.agents.api.Event",
-    "org.apache.flink.agents.api.context.RunnerContext",
-]
 
 
 class PythonAgentWithJavaActionAgent(Agent):
@@ -40,11 +36,7 @@ class PythonAgentWithJavaActionAgent(Agent):
         self.add_action(
             name="multiply_by_two",
             events=[InputEvent.EVENT_TYPE],
-            func=JavaFunction(
-                qualname=JAVA_HANDLER_QUALNAME,
-                method_name=JAVA_HANDLER_METHOD,
-                parameter_types=JAVA_HANDLER_PARAMETER_TYPES,
-            ),
+            func=JavaFunction.for_action(JAVA_HANDLER_QUALNAME, 
JAVA_HANDLER_METHOD),
         )
 
 
diff --git 
a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_actions_in_java_cross_language_test.py
 
b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_actions_in_java_cross_language_test.py
new file mode 100644
index 00000000..770d8505
--- /dev/null
+++ 
b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_actions_in_java_cross_language_test.py
@@ -0,0 +1,148 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+"""E2E test: Python-host agent whose orchestration actions are Java.
+
+Companion to ``yaml_cross_language_test.py`` (Python actions + Java tool).
+Here ``process_input`` / ``process_chat_response`` are ``type: java`` and
+dispatch to ``YamlCrossLanguageActions``; the Python-native built-in
+``chat_model_action`` bridges the chat loop, and the math path calls the
+cross-language Java ``calculateBMI`` tool. Exercises a Java user action →
+Python built-in → Java user action round trip.
+
+Skipped when the Ollama client/model or the cross-language test-jar is
+unavailable.
+"""
+
+import os
+import sysconfig
+from pathlib import Path
+
+import pytest
+from pyflink.common import Configuration, Encoder, WatermarkStrategy
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import RuntimeExecutionMode, StreamExecutionEnvironment
+from pyflink.datastream.connectors.file_system import (
+    FileSource,
+    StreamFormat,
+    StreamingFileSink,
+)
+
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from flink_agents.e2e_tests.test_utils import (
+    assert_tool_invoked,
+    collect_tool_invocations,
+    pull_model,
+)
+
+current_dir = Path(__file__).parent
+_RESOURCES = current_dir.parent / "resources"
+_REPO_ROOT = current_dir.parent.parent.parent.parent
+_TEST_JAR = (
+    _REPO_ROOT
+    / "e2e-test"
+    / "flink-agents-end-to-end-tests-resource-cross-language"
+    / "target"
+    / 
"flink-agents-end-to-end-tests-resource-cross-language-0.3-SNAPSHOT-tests.jar"
+)
+
+os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"]
+
+OLLAMA_MODEL = os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:1.7b")
+_client = pull_model(OLLAMA_MODEL)
+
+
[email protected](
+    _client is None,
+    reason="Ollama client is not available or test model is missing.",
+)
[email protected](
+    not _TEST_JAR.is_file(),
+    reason=(
+        "Cross-language test-jar is missing; run "
+        "'mvn package -DskipTests -pl e2e-test/"
+        "flink-agents-end-to-end-tests-resource-cross-language' first."
+    ),
+)
+def test_yaml_python_host_with_java_actions(
+    tmp_path: Path, monkeypatch: pytest.MonkeyPatch
+) -> None:
+    """``load_yaml`` agent whose process_* actions are Java, bridged by 
Python."""
+    monkeypatch.setenv("OLLAMA_CHAT_MODEL", OLLAMA_MODEL)
+    config = Configuration()
+    config.set_string("python.pythonpath", sysconfig.get_paths()["purelib"])
+    env = StreamExecutionEnvironment.get_execution_environment(config)
+    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+    env.set_parallelism(1)
+    env.add_jars(f"file://{_TEST_JAR}")
+
+    input_datastream = env.from_source(
+        source=FileSource.for_record_stream_format(
+            StreamFormat.text_line_format(),
+            f"file:///{_RESOURCES}/yaml_cross_language_input",
+        ).build(),
+        watermark_strategy=WatermarkStrategy.no_watermarks(),
+        source_name="yaml_actions_in_java_source",
+    )
+    deserialize_datastream = input_datastream.map(lambda x: str(x))
+
+    agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+    log_dir = tmp_path / "event_logs"
+    log_dir.mkdir(parents=True, exist_ok=True)
+    agents_env.get_config().set_str("baseLogDir", str(log_dir))
+    agents_env.load_yaml(_RESOURCES / 
"yaml_cross_language_actions_in_java.yaml")
+
+    output_datastream = (
+        agents_env.from_datastream(
+            input=deserialize_datastream, key_selector=lambda x: "orderKey"
+        )
+        .apply("yaml_actions_in_java_agent")
+        .to_datastream()
+    )
+
+    result_dir = tmp_path / "results"
+    result_dir.mkdir(parents=True, exist_ok=True)
+    output_datastream.map(
+        lambda x: str(x).replace("\n", "").replace("\r", ""), Types.STRING()
+    ).add_sink(
+        StreamingFileSink.for_row_format(
+            base_path=str(result_dir.absolute()),
+            encoder=Encoder.simple_string_encoder(),
+        ).build()
+    )
+
+    agents_env.execute()
+
+    actual_result = []
+    for file in result_dir.iterdir():
+        if file.is_dir():
+            for child in file.iterdir():
+                with child.open() as f:
+                    actual_result.extend(f.readlines())
+        if file.is_file():
+            with file.open() as f:
+                actual_result.extend(f.readlines())
+
+    # Math path went through the Java calculateBMI tool — cross-language tool.
+    assert_tool_invoked(
+        collect_tool_invocations(log_dir),
+        "calculateBMI",
+        {"weightKg": 70, "heightM": 1.75},
+    )
+    # Creative path uses no tool; its answer mentions a cat.
+    joined = "\n".join(actual_result).lower()
+    assert "cat" in joined, f"creative answer missing 'cat': {actual_result!r}"
diff --git 
a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_java_action_cross_language_test.py
 
b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_java_action_cross_language_test.py
new file mode 100644
index 00000000..37303c0f
--- /dev/null
+++ 
b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_java_action_cross_language_test.py
@@ -0,0 +1,107 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+"""E2E test: a YAML-declared agent whose action body is a Java method.
+
+Companion to ``yaml_cross_language_test.py`` (Java *tool*) — this covers
+the Java *action* path. Parses 
``resources/yaml_cross_language_java_action.yaml``
+via ``AgentsExecutionEnvironment.load_yaml`` and runs the declared agent.
+The single ``type: java`` action dispatches into
+``org.apache.flink.agents.resource.test.JavaActionHandler.multiplyByTwo``,
+mirroring ``python_agent_with_java_action_test`` but driven by YAML. The
+YAML omits ``parameter_types`` (actions have a fixed signature), so this
+also guards the loader's auto-fill. No chat model is involved, so it runs
+whenever the cross-language test-jar is present.
+"""
+
+import os
+import sysconfig
+from pathlib import Path
+
+import pytest
+from pyflink.common import Configuration, Encoder
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors.file_system import StreamingFileSink
+
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+
+current_dir = Path(__file__).parent
+_RESOURCES = current_dir.parent / "resources"
+_REPO_ROOT = current_dir.parent.parent.parent.parent
+_TEST_JAR = (
+    _REPO_ROOT
+    / "e2e-test"
+    / "flink-agents-end-to-end-tests-resource-cross-language"
+    / "target"
+    / 
"flink-agents-end-to-end-tests-resource-cross-language-0.3-SNAPSHOT-tests.jar"
+)
+
+os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"]
+
+
[email protected](
+    not _TEST_JAR.is_file(),
+    reason=(
+        "Cross-language test-jar is missing; run "
+        "'mvn package -DskipTests -pl e2e-test/"
+        "flink-agents-end-to-end-tests-resource-cross-language' first."
+    ),
+)
+def test_yaml_agent_dispatches_java_action_body(tmp_path: Path) -> None:
+    """``load_yaml`` → ``apply(by name)`` with a YAML-declared Java action."""
+    config = Configuration()
+    config.set_string("python.pythonpath", sysconfig.get_paths()["purelib"])
+    env = StreamExecutionEnvironment.get_execution_environment(config)
+    env.set_parallelism(1)
+    env.add_jars(f"file://{_TEST_JAR}")
+
+    input_stream = env.from_collection([1, 2, 3, 4, 5], 
type_info=Types.LONG()).map(
+        lambda x: x
+    )
+
+    agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+    agents_env.load_yaml(_RESOURCES / "yaml_cross_language_java_action.yaml")
+    output_datastream = (
+        agents_env.from_datastream(input=input_stream, key_selector=lambda x: 
0)
+        .apply("yaml_cross_language_java_action_agent")
+        .to_datastream(Types.LONG())
+    )
+
+    result_dir = tmp_path / "results"
+    result_dir.mkdir(parents=True, exist_ok=True)
+    output_datastream.map(lambda x: str(x), Types.STRING()).add_sink(
+        StreamingFileSink.for_row_format(
+            base_path=str(result_dir.absolute()),
+            encoder=Encoder.simple_string_encoder(),
+        ).build()
+    )
+
+    agents_env.execute()
+
+    actual: list[int] = []
+    for file in result_dir.iterdir():
+        if file.is_dir():
+            for child in file.iterdir():
+                with child.open() as f:
+                    actual.extend(int(line.strip()) for line in f if 
line.strip())
+        elif file.is_file():
+            with file.open() as f:
+                actual.extend(int(line.strip()) for line in f if line.strip())
+
+    actual.sort()
+    assert actual == [2, 4, 6, 8, 10], f"unexpected outputs: {actual}"
diff --git 
a/python/flink_agents/e2e_tests/resources/yaml_cross_language_actions_in_java.yaml
 
b/python/flink_agents/e2e_tests/resources/yaml_cross_language_actions_in_java.yaml
new file mode 100644
index 00000000..b7acca47
--- /dev/null
+++ 
b/python/flink_agents/e2e_tests/resources/yaml_cross_language_actions_in_java.yaml
@@ -0,0 +1,49 @@
+agents:
+  - name: yaml_actions_in_java_agent
+    description: |
+      Python-host cross-language e2e agent whose orchestration actions are
+      Java. Complements yaml_cross_language_agent (Python actions + Java tool):
+      here the user actions ``process_input`` / ``process_chat_response`` are
+      ``type: java`` and dispatch to ``YamlCrossLanguageActions``, while the
+      Python-native built-in ``chat_model_action`` bridges 
ChatRequest→Response.
+      The math path also exercises a cross-language Java tool (calculateBMI).
+
+    chat_model_connections:
+      - name: ollama_connection
+        clazz: ollama
+        request_timeout: 240.0
+      - name: ollama_connection_java
+        clazz: ollama
+        type: java
+        endpoint: http://localhost:11434
+        requestTimeout: 240
+
+    chat_model_setups:
+      - name: math_chat_model
+        clazz: ollama
+        connection: ollama_connection
+        model: qwen3:1.7b
+        tools: [calculateBMI]
+        extract_reasoning: true
+      - name: creative_chat_model
+        clazz: ollama
+        type: java
+        connection: ollama_connection_java
+        model: qwen3:1.7b
+        extract_reasoning: true
+
+    tools:
+      - name: calculateBMI
+        type: java
+        function: 
org.apache.flink.agents.resource.test.ChatModelCrossLanguageAgent:calculateBMI
+        parameter_types: [java.lang.Double, java.lang.Double]
+
+    actions:
+      - name: process_input
+        type: java
+        function: 
org.apache.flink.agents.resource.test.YamlCrossLanguageActions:processInput
+        listen_to: [input]
+      - name: process_chat_response
+        type: java
+        function: 
org.apache.flink.agents.resource.test.YamlCrossLanguageActions:processChatResponse
+        listen_to: [chat_response]
diff --git 
a/python/flink_agents/e2e_tests/resources/yaml_cross_language_java_action.yaml 
b/python/flink_agents/e2e_tests/resources/yaml_cross_language_java_action.yaml
new file mode 100644
index 00000000..d6c9029b
--- /dev/null
+++ 
b/python/flink_agents/e2e_tests/resources/yaml_cross_language_java_action.yaml
@@ -0,0 +1,15 @@
+agents:
+  - name: yaml_cross_language_java_action_agent
+    description: |
+      YAML-driven cross-language e2e agent whose action body is a Java
+      static method. Exercises the Python→Java action bridge: the agent
+      declares a single ``type: java`` action that dispatches into
+      ``JavaActionHandler.multiplyByTwo``. No ``parameter_types`` is
+      written — action signatures are fixed (Event, RunnerContext), so the
+      loader fills them in.
+
+    actions:
+      - name: multiply_by_two
+        type: java
+        function: 
org.apache.flink.agents.resource.test.JavaActionHandler:multiplyByTwo
+        listen_to: [input]
diff --git a/python/flink_agents/plan/tests/test_agent_plan.py 
b/python/flink_agents/plan/tests/test_agent_plan.py
index 801223e2..9d68f8c1 100644
--- a/python/flink_agents/plan/tests/test_agent_plan.py
+++ b/python/flink_agents/plan/tests/test_agent_plan.py
@@ -149,14 +149,7 @@ _JAVA_HANDLER_QUALNAME = (
 class AgentWithCrossLanguageDecoratedAction(Agent):
     @action(
         InputEvent.EVENT_TYPE,
-        target=JavaFunction(
-            qualname=_JAVA_HANDLER_QUALNAME,
-            method_name="handleInput",
-            parameter_types=[
-                "org.apache.flink.agents.api.Event",
-                "org.apache.flink.agents.api.context.RunnerContext",
-            ],
-        ),
+        target=JavaFunction.for_action(_JAVA_HANDLER_QUALNAME, "handleInput"),
     )
     @staticmethod
     def handle(event: Event, ctx: RunnerContext) -> None:
diff --git a/python/flink_agents/plan/tests/test_agent_plan_cross_language.py 
b/python/flink_agents/plan/tests/test_agent_plan_cross_language.py
index 9a4b6d34..fbf46711 100644
--- a/python/flink_agents/plan/tests/test_agent_plan_cross_language.py
+++ b/python/flink_agents/plan/tests/test_agent_plan_cross_language.py
@@ -60,14 +60,7 @@ def _dummy_action(event: Event, ctx: RunnerContext) -> None:
 
 
 def _make_java_function_descriptor() -> ApiJavaFunction:
-    return ApiJavaFunction(
-        qualname="com.example.Handlers",
-        method_name="handle",
-        parameter_types=[
-            "org.apache.flink.agents.api.Event",
-            "org.apache.flink.agents.api.context.RunnerContext",
-        ],
-    )
+    return ApiJavaFunction.for_action("com.example.Handlers", "handle")
 
 
 def _make_python_function_descriptor() -> ApiPythonFunction:
diff --git 
a/python/flink_agents/runtime/tests/test_local_runner_cross_language.py 
b/python/flink_agents/runtime/tests/test_local_runner_cross_language.py
index 0bb1502e..7075fc71 100644
--- a/python/flink_agents/runtime/tests/test_local_runner_cross_language.py
+++ b/python/flink_agents/runtime/tests/test_local_runner_cross_language.py
@@ -36,14 +36,7 @@ def echo_action(event: Event, ctx: RunnerContext) -> None:
 
 
 def _make_java_function_descriptor() -> ApiJavaFunction:
-    return ApiJavaFunction(
-        qualname="com.example.Handlers",
-        method_name="handle",
-        parameter_types=[
-            "org.apache.flink.agents.api.Event",
-            "org.apache.flink.agents.api.context.RunnerContext",
-        ],
-    )
+    return ApiJavaFunction.for_action("com.example.Handlers", "handle")
 
 
 def test_local_runner_dispatches_python_function_action() -> None:

Reply via email to