weiqingy commented on code in PR #639:
URL: https://github.com/apache/flink-agents/pull/639#discussion_r3300087498
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java:
##########
@@ -271,6 +272,8 @@ public void open() throws Exception {
shortTermMemState =
getRuntimeContext().getMapState(shortTermMemStateDescriptor);
resourceCache = new ResourceCache(agentPlan.getResourceProviders());
+ JavaMCPResourceDiscovery.discoverJavaMCPResources(
Review Comment:
`JavaMCPResourceDiscovery.discoverJavaMCPResources` runs synchronously here,
which means a slow or briefly-unreachable MCP server stalls (or fails) the
entire operator's startup. Discussion #543 anticipates this with
`failFastOnStartup(true)` as the default and a graceful-degradation opt-in —
this PR ships fail-fast only. Is the intent to defer the opt-in to a follow-up,
and does the current shape leave room for it (e.g., a per-server `try/catch`
boundary around the discovery loop)?
##########
runtime/src/main/java/org/apache/flink/agents/runtime/JavaMCPResourceDiscovery.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime;
+
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.plan.resourceprovider.JavaResourceProvider;
+import org.apache.flink.agents.plan.resourceprovider.ResourceProvider;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+
+import static org.apache.flink.agents.api.resource.ResourceType.MCP_SERVER;
+import static org.apache.flink.agents.api.resource.ResourceType.PROMPT;
+import static org.apache.flink.agents.api.resource.ResourceType.TOOL;
+
+/**
+ * Discovers tools and prompts from Java MCP servers and registers them in a
ResourceCache.
+ *
+ * <p>Called once during operator initialization, immediately after the
ResourceCache is created.
+ * Uses reflection throughout to preserve Java 11 compatibility (MCP classes
are conditionally
+ * compiled for Java 17+).
+ */
+public class JavaMCPResourceDiscovery {
+
+ /**
+ * Initializes Java MCP servers from the resource providers, extracts
their tools and prompts,
+ * and registers them in the cache.
+ *
+ * @param resourceProviders the resource providers from the agent plan
+ * @param cache the resource cache to register discovered resources in
+ * @throws Exception if a Java MCP server fails to initialize or discovery
fails
+ */
+ public static void discoverJavaMCPResources(
+ Map<ResourceType, Map<String, ResourceProvider>>
resourceProviders, ResourceCache cache)
+ throws Exception {
+
+ Map<String, ResourceProvider> servers =
resourceProviders.get(MCP_SERVER);
+ if (servers == null) {
+ return;
+ }
+
+ for (ResourceProvider rp : servers.values()) {
+ if (!(rp instanceof JavaResourceProvider)) {
+ continue;
+ }
+
+ Object mcpServer = rp.provide(null);
Review Comment:
`rp.provide(null)` passes `null` as `ResourceContext`.
`PythonMCPResourceDiscovery` at `PythonMCPResourceDiscovery.java:73` passes
`cache.getResourceContext()` for the same role. Today `MCPServer`'s constructor
just stores the field without dereferencing it, so the null is benign — but if
a future Java MCP server resolves a dependent resource through
`getResourceContext()`, it would NPE here in a way that doesn't manifest on the
Python side. Any reason to keep the null rather than match the Python path?
##########
plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareMCPServerTest.java:
##########
@@ -222,116 +221,148 @@ void discoverMCPServer() {
@Test
@DisabledOnJre(JRE.JAVA_11)
- @DisplayName("Discover and register tools from MCP server")
+ @DisplayName("Tools are NOT in AgentPlan providers — discovery is deferred
to operator startup")
void discoverToolsFromMCPServer() {
Map<ResourceType, Map<String, ResourceProvider>> providers =
agentPlan.getResourceProviders();
- assertTrue(providers.containsKey(ResourceType.TOOL));
-
- Map<String, ?> toolProviders = providers.get(ResourceType.TOOL);
- assertTrue(toolProviders.containsKey("add"), "add tool should be
discovered");
- assertEquals(1, toolProviders.size(), "Should have exactly 1 tool from
Python server");
+ // Tools are discovered at runtime by JavaMCPResourceDiscovery, not
during plan construction
+ assertNull(
+ providers.get(ResourceType.TOOL),
+ "TOOL providers should be absent from AgentPlan; discovery is
deferred to runtime");
}
@Test
@DisabledOnJre(JRE.JAVA_11)
- @DisplayName("Discover and register prompts from MCP server")
+ @DisplayName(
+ "Prompts are NOT in AgentPlan providers — discovery is deferred to
operator startup")
void discoverPromptsFromMCPServer() {
Map<ResourceType, Map<String, ResourceProvider>> providers =
agentPlan.getResourceProviders();
- assertTrue(providers.containsKey(ResourceType.PROMPT));
-
- Map<String, ?> promptProviders = providers.get(ResourceType.PROMPT);
- assertTrue(promptProviders.containsKey("ask_sum"), "ask_sum prompt
should be discovered");
- assertEquals(1, promptProviders.size(), "Should have exactly 1 prompt
from Python server");
+ // Prompts are discovered at runtime by JavaMCPResourceDiscovery, not
during plan
+ // construction
+ assertNull(
+ providers.get(ResourceType.PROMPT),
+ "PROMPT providers should be absent from AgentPlan; discovery
is deferred to runtime");
}
@Test
@DisabledOnJre(JRE.JAVA_11)
- @DisplayName("Retrieve MCP tool from AgentPlan - add tool")
+ @DisplayName("Retrieve MCP tool at runtime - add tool")
void retrieveMCPToolAdd() throws Exception {
- Tool tool = (Tool) resolveResource("add", ResourceType.TOOL);
- assertNotNull(tool);
- assertInstanceOf(MCPTool.class, tool);
-
- MCPTool mcpTool = (MCPTool) tool;
- assertEquals("add", mcpTool.getName());
- // Verify description starts with expected text
- assertTrue(
- mcpTool.getMetadata()
- .getDescription()
- .startsWith("Get the detailed information of a
specified IP address."),
- "Description should start with expected text");
- // Verify input schema contains expected parameters
- String schema = mcpTool.getMetadata().getInputSchema();
- assertTrue(schema.contains("a"), "Schema should contain parameter
'a'");
- assertTrue(schema.contains("b"), "Schema should contain parameter
'b'");
+ MCPServer server = instantiateMCPServer();
+ try {
+ MCPTool tool = null;
+ for (MCPTool t : server.listTools()) {
+ if ("add".equals(t.getName())) {
+ tool = t;
+ break;
+ }
+ }
+ assertNotNull(tool, "add tool should be discoverable from
MCPServer");
+ assertInstanceOf(MCPTool.class, tool);
+ assertEquals("add", tool.getName());
+ assertTrue(
+ tool.getMetadata()
+ .getDescription()
+ .startsWith("Get the detailed information of a
specified IP address."),
+ "Description should start with expected text");
+ String schema = tool.getMetadata().getInputSchema();
+ assertTrue(schema.contains("a"), "Schema should contain parameter
'a'");
+ assertTrue(schema.contains("b"), "Schema should contain parameter
'b'");
+ } finally {
+ server.close();
+ }
}
@Test
@DisabledOnJre(JRE.JAVA_11)
- @DisplayName("Retrieve MCP prompt from AgentPlan - ask_sum")
+ @DisplayName("Retrieve MCP prompt at runtime - ask_sum")
void retrieveMCPPromptAskSum() throws Exception {
- Prompt prompt = (Prompt) resolveResource("ask_sum",
ResourceType.PROMPT);
- assertNotNull(prompt);
- assertInstanceOf(MCPPrompt.class, prompt);
-
- MCPPrompt mcpPrompt = (MCPPrompt) prompt;
- assertEquals("ask_sum", mcpPrompt.getName());
- assertEquals("Prompt of add tool.", mcpPrompt.getDescription());
- // ask_sum prompt should have 'a' and 'b' as arguments
- Map<String, MCPPrompt.PromptArgument> args =
mcpPrompt.getPromptArguments();
- assertTrue(args.containsKey("a"), "Should have 'a' argument");
- assertTrue(args.containsKey("b"), "Should have 'b' argument");
+ MCPServer server = instantiateMCPServer();
+ try {
+ MCPPrompt prompt = null;
+ for (MCPPrompt p : server.listPrompts()) {
+ if ("ask_sum".equals(p.getName())) {
+ prompt = p;
+ break;
+ }
+ }
+ assertNotNull(prompt, "ask_sum prompt should be discoverable from
MCPServer");
+ assertInstanceOf(MCPPrompt.class, prompt);
+ assertEquals("ask_sum", prompt.getName());
+ assertEquals("Prompt of add tool.", prompt.getDescription());
+ Map<String, MCPPrompt.PromptArgument> args =
prompt.getPromptArguments();
+ assertTrue(args.containsKey("a"), "Should have 'a' argument");
+ assertTrue(args.containsKey("b"), "Should have 'b' argument");
+ } finally {
+ server.close();
+ }
}
@Test
@DisabledOnJre(JRE.JAVA_11)
- @DisplayName("AgentPlan JSON serialization with MCP resources")
+ @DisplayName(
+ "AgentPlan JSON serialization contains MCPServer descriptor, not
tool/prompt entries")
void testAgentPlanJsonSerializableWithMCP() throws Exception {
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(agentPlan);
- // Verify JSON contains MCP resources
- assertTrue(json.contains("add"), "JSON should contain add tool");
- assertTrue(json.contains("ask_sum"), "JSON should contain ask_sum
prompt");
+ // Serialized plan contains the MCPServer configuration
assertTrue(json.contains("mcp_server"), "JSON should contain
mcp_server type");
+ assertTrue(json.contains("testMcpServer"), "JSON should contain the
server provider name");
+ assertTrue(json.contains(MCP_ENDPOINT), "JSON should contain the
endpoint");
+
+ // Tools and prompts are NOT serialized into the plan (they are
runtime-discovered)
+ assertFalse(
+ json.contains("\"add\"") && json.contains("java_serializable"),
Review Comment:
```java
assertFalse(
json.contains("\"add\"") && json.contains("java_serializable"),
"JSON should not contain a serialized 'add' tool provider");
```
This passes when *either* substring is missing, so a regression that drops
`"add"` from the JSON for an unrelated reason would silently make this test
green — the very class of bug the test is meant to catch. Since the design
contract you're locking down is "no `java_serializable` providers for
MCP-discovered resources at all," one alternative in case it helps:
```java
assertFalse(json.contains("java_serializable"),
"JSON should not contain any java_serializable provider entries (MCP
discovery is deferred)");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]