wenjin272 commented on code in PR #356:
URL: https://github.com/apache/flink-agents/pull/356#discussion_r2622419914


##########
runtime/pom.xml:
##########
@@ -134,4 +134,56 @@ under the License.
         </dependency>
     </dependencies>
 
+    <profiles>
+        <profile>
+            <id>mcp-disabled</id>
+            <activation>
+                <jdk>[11,17)</jdk>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-compiler-plugin</artifactId>
+                        <configuration>
+                            <testExcludes>
+                                
<testExclude>org/apache/flink/agents/runtime/mcp/**/*.java</testExclude>
+                            </testExcludes>
+                        </configuration>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-surefire-plugin</artifactId>
+                        <version>3.0.0-M5</version>
+                        <configuration>
+                            <argLine>
+                                --add-opens java.base/java.util=ALL-UNNAMED
+                                --add-opens java.base/java.lang=ALL-UNNAMED
+                            </argLine>
+                            <excludes>
+                                <exclude>**/mcp/**/*Test.java</exclude>
+                            </excludes>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>3.0.0-M5</version>
+                <configuration>
+                    <argLine>
+                        --add-opens java.base/java.util=ALL-UNNAMED
+                        --add-opens java.base/java.lang=ALL-UNNAMED
+                    </argLine>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
 </project>

Review Comment:
   If we move the mcp test to api, this profile can be removed. 
   Besides, the project root pom has add surefire options, maybe we don't need 
add again here.



##########
plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java:
##########
@@ -63,6 +63,8 @@
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static org.apache.flink.agents.api.resource.ResourceType.*;

Review Comment:
   In flink, we recommend avoid star import.



##########
api/src/main/java/org/apache/flink/agents/api/mcp/MCPPrompt.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.mcp;
+
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.prompt.Prompt;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * MCP prompt definition that extends the base Prompt class.
+ *
+ * <p>This represents a prompt managed by an MCP server. Unlike static 
prompts, MCP prompts are
+ * fetched dynamically from the server and can accept arguments.
+ */
+public class MCPPrompt extends Prompt {
+
+    private static final String FIELD_NAME = "name";
+    private static final String FIELD_DESCRIPTION = "description";
+    private static final String FIELD_ARGUMENTS = "arguments";
+    private static final String FIELD_MCP_SERVER = "mcpServer";
+
+    @JsonProperty(FIELD_NAME)
+    private final String name;
+
+    @JsonProperty(FIELD_DESCRIPTION)
+    private final String description;
+
+    @JsonProperty(FIELD_ARGUMENTS)
+    private final Map<String, PromptArgument> promptArguments;
+
+    @JsonProperty(FIELD_MCP_SERVER)
+    private final MCPServer mcpServer;
+
+    /** Represents an argument that can be passed to an MCP prompt. */
+    public static class PromptArgument {
+        @JsonProperty("name")
+        private final String name;
+
+        @JsonProperty("description")
+        private final String description;
+
+        @JsonProperty("required")
+        private final boolean required;
+
+        @JsonCreator
+        public PromptArgument(
+                @JsonProperty("name") String name,
+                @JsonProperty("description") String description,
+                @JsonProperty("required") boolean required) {
+            this.name = Objects.requireNonNull(name, "name cannot be null");
+            this.description = description;
+            this.required = required;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public String getDescription() {
+            return description;
+        }
+
+        public boolean isRequired() {
+            return required;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            PromptArgument that = (PromptArgument) o;
+            return required == that.required
+                    && Objects.equals(name, that.name)
+                    && Objects.equals(description, that.description);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(name, description, required);
+        }
+    }
+
+    /**
+     * Create a new MCPPrompt.
+     *
+     * @param name The prompt name
+     * @param description The prompt description
+     * @param promptArguments Map of argument names to argument definitions
+     * @param mcpServer The MCP server reference
+     */
+    @JsonCreator
+    public MCPPrompt(
+            @JsonProperty(FIELD_NAME) String name,
+            @JsonProperty(FIELD_DESCRIPTION) String description,
+            @JsonProperty(FIELD_ARGUMENTS) Map<String, PromptArgument> 
promptArguments,
+            @JsonProperty(FIELD_MCP_SERVER) MCPServer mcpServer) {
+        this.name = Objects.requireNonNull(name, "name cannot be null");
+        this.description = description;
+        this.promptArguments =
+                promptArguments != null ? new HashMap<>(promptArguments) : new 
HashMap<>();
+        this.mcpServer = Objects.requireNonNull(mcpServer, "mcpServer cannot 
be null");
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public Map<String, PromptArgument> getPromptArguments() {
+        return new HashMap<>(promptArguments);
+    }
+
+    @JsonIgnore
+    public MCPServer getMcpServer() {
+        return mcpServer;
+    }
+
+    /**
+     * Format the prompt as a string with the given arguments. Overrides the 
base Prompt class to
+     * fetch prompts from the MCP server.
+     *
+     * @param arguments Arguments to pass to the prompt (String keys and 
values)
+     * @return The formatted prompt as a string
+     */
+    @Override
+    public String formatString(Map<String, String> arguments) {
+        List<ChatMessage> messages = formatMessages(MessageRole.SYSTEM, 
arguments);
+        return messages.stream()
+                .map(msg -> msg.getRole().getValue() + ": " + msg.getContent())
+                .collect(Collectors.joining("\n"));
+    }
+
+    /**
+     * Format the prompt as a list of chat messages with the given arguments. 
Overrides the base
+     * Prompt class to fetch prompts from the MCP server.
+     *
+     * @param defaultRole The default role for messages (usually SYSTEM)
+     * @param kwargs Arguments to pass to the prompt (String keys and values)
+     * @return List of formatted chat messages
+     */
+    @Override
+    public List<ChatMessage> formatMessages(MessageRole defaultRole, 
Map<String, String> kwargs) {
+        Map<String, Object> objectArgs = new HashMap<>(kwargs);
+        return formatMessages(objectArgs);
+    }
+
+    /**
+     * Format the prompt as a list of chat messages with the given arguments. 
This overloaded
+     * version accepts Object values for flexibility.
+     *
+     * @param arguments Arguments to pass to the prompt (Object values)
+     * @return List of formatted chat messages
+     */
+    public List<ChatMessage> formatMessages(Map<String, Object> arguments) {

Review Comment:
   Maybe this method can be private



##########
api/src/main/java/org/apache/flink/agents/api/mcp/MCPServer.java:
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.mcp;
+
+import io.modelcontextprotocol.client.McpClient;
+import io.modelcontextprotocol.client.McpSyncClient;
+import 
io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport;
+import io.modelcontextprotocol.spec.McpSchema;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.mcp.auth.ApiKeyAuth;
+import org.apache.flink.agents.api.mcp.auth.Auth;
+import org.apache.flink.agents.api.mcp.auth.BasicAuth;
+import org.apache.flink.agents.api.mcp.auth.BearerTokenAuth;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.SerializableResource;
+import org.apache.flink.agents.api.tools.ToolMetadata;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.net.URI;
+import java.net.http.HttpRequest;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Resource representing an MCP server and exposing its tools/prompts.
+ *
+ * <p>This is a logical container for MCP tools and prompts; it is not 
directly invokable. It uses
+ * the official MCP Java SDK to communicate with MCP servers via HTTP/SSE.
+ *
+ * <p>Authentication is supported through the {@link Auth} interface with 
multiple implementations:
+ *
+ * <ul>
+ *   <li>{@link BearerTokenAuth} - For OAuth 2.0 and JWT tokens
+ *   <li>{@link BasicAuth} - For username/password authentication
+ *   <li>{@link ApiKeyAuth} - For API key authentication via custom headers
+ * </ul>
+ *
+ * <p>Example with OAuth authentication:
+ *
+ * <pre>{@code
+ * MCPServer server = MCPServer.builder("https://api.example.com/mcp";)
+ *     .auth(new BearerTokenAuth("your-oauth-token"))
+ *     .timeout(Duration.ofSeconds(30))
+ *     .build();
+ *
+ * List<MCPTool> tools = server.listTools();
+ * server.close();
+ * }</pre>
+ *
+ * <p>Reference: <a 
href="https://modelcontextprotocol.io/sdk/java/mcp-client";>MCP Java Client</a>
+ */
+public class MCPServer extends SerializableResource {
+
+    private static final String FIELD_ENDPOINT = "endpoint";
+    private static final String FIELD_HEADERS = "headers";
+    private static final String FIELD_TIMEOUT_SECONDS = "timeoutSeconds";
+    private static final String FIELD_SSE_READ_TIMEOUT_SECONDS = 
"sseReadTimeoutSeconds";

Review Comment:
   This field is never used.



##########
api/src/test/java/org/apache/flink/agents/api/mcp/MCPServerTest.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.mcp;
+
+import org.apache.flink.agents.api.mcp.auth.ApiKeyAuth;
+import org.apache.flink.agents.api.mcp.auth.BasicAuth;
+import org.apache.flink.agents.api.mcp.auth.BearerTokenAuth;
+import org.apache.flink.agents.api.resource.ResourceType;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnJre;
+import org.junit.jupiter.api.condition.JRE;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link MCPServer}. */
+class MCPServerTest {
+
+    @Test
+    @DisabledOnJre(JRE.JAVA_11)
+    @DisplayName("Create MCPServer with builder")
+    void testBuilderCreation() {
+        MCPServer server =
+                MCPServer.builder("http://localhost:8000/mcp";)
+                        .header("X-Custom-Header", "value")
+                        .timeout(Duration.ofSeconds(30))
+                        .auth(new BearerTokenAuth("test-token"))
+                        .build();
+
+        
assertThat(server.getEndpoint()).isEqualTo("http://localhost:8000/mcp";);
+        assertThat(server.getHeaders()).containsEntry("X-Custom-Header", 
"value");
+        assertThat(server.getTimeoutSeconds()).isEqualTo(30);
+        assertThat(server.getAuth()).isInstanceOf(BearerTokenAuth.class);
+    }
+
+    @Test
+    @DisabledOnJre(JRE.JAVA_11)
+    @DisplayName("Create MCPServer with simple constructor")
+    void testSimpleConstructor() {
+        MCPServer server = new MCPServer("http://localhost:8000/mcp";);

Review Comment:
   The endpoint `http://localhost:8000/mcp` is used in each test case, maybe we 
can extract it as a literal.



##########
api/src/main/java/org/apache/flink/agents/api/mcp/MCPServer.java:
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.mcp;
+
+import io.modelcontextprotocol.client.McpClient;
+import io.modelcontextprotocol.client.McpSyncClient;
+import 
io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport;
+import io.modelcontextprotocol.spec.McpSchema;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.mcp.auth.ApiKeyAuth;
+import org.apache.flink.agents.api.mcp.auth.Auth;
+import org.apache.flink.agents.api.mcp.auth.BasicAuth;
+import org.apache.flink.agents.api.mcp.auth.BearerTokenAuth;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.SerializableResource;
+import org.apache.flink.agents.api.tools.ToolMetadata;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.net.URI;
+import java.net.http.HttpRequest;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Resource representing an MCP server and exposing its tools/prompts.
+ *
+ * <p>This is a logical container for MCP tools and prompts; it is not 
directly invokable. It uses
+ * the official MCP Java SDK to communicate with MCP servers via HTTP/SSE.
+ *
+ * <p>Authentication is supported through the {@link Auth} interface with 
multiple implementations:
+ *
+ * <ul>
+ *   <li>{@link BearerTokenAuth} - For OAuth 2.0 and JWT tokens
+ *   <li>{@link BasicAuth} - For username/password authentication
+ *   <li>{@link ApiKeyAuth} - For API key authentication via custom headers
+ * </ul>
+ *
+ * <p>Example with OAuth authentication:
+ *
+ * <pre>{@code
+ * MCPServer server = MCPServer.builder("https://api.example.com/mcp";)
+ *     .auth(new BearerTokenAuth("your-oauth-token"))
+ *     .timeout(Duration.ofSeconds(30))
+ *     .build();
+ *
+ * List<MCPTool> tools = server.listTools();
+ * server.close();
+ * }</pre>
+ *
+ * <p>Reference: <a 
href="https://modelcontextprotocol.io/sdk/java/mcp-client";>MCP Java Client</a>
+ */
+public class MCPServer extends SerializableResource {
+
+    private static final String FIELD_ENDPOINT = "endpoint";
+    private static final String FIELD_HEADERS = "headers";
+    private static final String FIELD_TIMEOUT_SECONDS = "timeoutSeconds";
+    private static final String FIELD_SSE_READ_TIMEOUT_SECONDS = 
"sseReadTimeoutSeconds";
+    private static final String FIELD_AUTH = "auth";
+
+    @JsonProperty(FIELD_ENDPOINT)
+    private final String endpoint;
+
+    @JsonProperty(FIELD_HEADERS)
+    private final Map<String, String> headers;
+
+    @JsonProperty(FIELD_TIMEOUT_SECONDS)
+    private final long timeoutSeconds;
+
+    @JsonProperty(FIELD_AUTH)
+    private final Auth auth;
+
+    @JsonIgnore private transient McpSyncClient client;
+
+    /** Builder for MCPServer with fluent API. */
+    public static class Builder {
+        private String endpoint;
+        private final Map<String, String> headers = new HashMap<>();
+        private long timeoutSeconds = 30;
+        private Auth auth = null;
+
+        public Builder endpoint(String endpoint) {
+            this.endpoint = endpoint;
+            return this;
+        }
+
+        public Builder header(String key, String value) {
+            this.headers.put(key, value);
+            return this;
+        }
+
+        public Builder headers(Map<String, String> headers) {
+            this.headers.putAll(headers);
+            return this;
+        }
+
+        public Builder timeout(Duration timeout) {
+            this.timeoutSeconds = timeout.getSeconds();
+            return this;
+        }
+
+        public Builder auth(Auth auth) {
+            this.auth = auth;
+            return this;
+        }
+
+        public MCPServer build() {
+            return new MCPServer(endpoint, headers, timeoutSeconds, auth);
+        }
+    }
+
+    /**
+     * Creates a new MCPServer instance.
+     *
+     * @param endpoint The HTTP endpoint of the MCP server
+     */
+    public MCPServer(String endpoint) {
+        this(endpoint, new HashMap<>(), 30, null);
+    }
+
+    @JsonCreator
+    public MCPServer(
+            @JsonProperty(FIELD_ENDPOINT) String endpoint,
+            @JsonProperty(FIELD_HEADERS) Map<String, String> headers,
+            @JsonProperty(FIELD_TIMEOUT_SECONDS) long timeoutSeconds,
+            @JsonProperty(FIELD_AUTH) Auth auth) {
+        this.endpoint = Objects.requireNonNull(endpoint, "endpoint cannot be 
null");
+        this.headers = headers != null ? new HashMap<>(headers) : new 
HashMap<>();
+        this.timeoutSeconds = timeoutSeconds;
+        this.auth = auth;
+    }
+
+    public static Builder builder(String endpoint) {
+        return new Builder().endpoint(endpoint);
+    }
+
+    @Override
+    @JsonIgnore
+    public ResourceType getResourceType() {
+        return ResourceType.MCP_SERVER;
+    }
+
+    public String getEndpoint() {
+        return endpoint;
+    }
+
+    public Map<String, String> getHeaders() {
+        return new HashMap<>(headers);
+    }
+
+    public long getTimeoutSeconds() {
+        return timeoutSeconds;
+    }
+
+    public Auth getAuth() {
+        return auth;
+    }
+
+    /**
+     * Get or create a synchronized MCP client.
+     *
+     * @return The MCP sync client
+     */
+    @JsonIgnore
+    private synchronized McpSyncClient getClient() {
+        if (client == null) {
+            client = createClient();
+        }
+        return client;
+    }
+
+    /**
+     * Create a new MCP client with the configured transport.
+     *
+     * @return A new MCP sync client
+     */
+    private McpSyncClient createClient() {
+        validateHttpUrl();
+
+        var requestBuilder = 
HttpRequest.newBuilder().timeout(Duration.ofSeconds(timeoutSeconds));
+
+        // Add custom headers
+        headers.forEach(requestBuilder::header);
+
+        // Apply authentication if configured
+        if (auth != null) {
+            auth.applyAuth(requestBuilder);
+        }
+
+        // Create transport based on type
+        var transport =
+                HttpClientStreamableHttpTransport.builder(endpoint)
+                        .requestBuilder(requestBuilder)
+                        .build();
+
+        // Build and initialize the client
+        var mcpClient =
+                McpClient.sync(transport)
+                        .requestTimeout(Duration.ofSeconds(timeoutSeconds))
+                        .build();
+
+        mcpClient.initialize();
+        return mcpClient;
+    }
+
+    /** Validate that the endpoint is a valid HTTP URL. */
+    private void validateHttpUrl() {
+        try {
+            URI uri = URI.create(endpoint);
+            String scheme = uri.getScheme();
+            if (scheme == null || (!scheme.equals("http") && 
!scheme.equals("https"))) {
+                throw new IllegalArgumentException(
+                        "Invalid HTTP endpoint: " + endpoint + ". Scheme must 
be http or https");
+            }
+            if (uri.getHost() == null || uri.getHost().isEmpty()) {
+                throw new IllegalArgumentException(
+                        "Invalid HTTP endpoint: " + endpoint + ". Host cannot 
be empty");
+            }
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("Invalid HTTP endpoint: " + 
endpoint, e);
+        }
+    }
+
+    /**
+     * List available tools from the MCP server.
+     *
+     * @return List of MCPTool instances
+     */
+    public List<MCPTool> listTools() {
+        McpSyncClient mcpClient = getClient();
+        McpSchema.ListToolsResult toolsResult = mcpClient.listTools();
+
+        List<MCPTool> tools = new ArrayList<>();
+        for (McpSchema.Tool toolData : toolsResult.tools()) {
+            ToolMetadata metadata =
+                    new ToolMetadata(
+                            toolData.name(),
+                            toolData.description() != null ? 
toolData.description() : "",
+                            serializeInputSchema(toolData.inputSchema()));
+
+            MCPTool tool = new MCPTool(metadata, this);
+            tools.add(tool);
+        }
+
+        return tools;
+    }
+
+    /**
+     * Get a specific tool by name.
+     *
+     * @param name The tool name
+     * @return The MCPTool instance
+     * @throws IllegalArgumentException if tool not found
+     */
+    public MCPTool getTool(String name) {
+        List<MCPTool> tools = listTools();
+        return tools.stream()
+                .filter(tool -> tool.getName().equals(name))
+                .findFirst()
+                .orElseThrow(
+                        () ->
+                                new IllegalArgumentException(
+                                        "Tool '"
+                                                + name
+                                                + "' not found on MCP server 
at "
+                                                + endpoint));
+    }
+
+    /**
+     * Get tool metadata by name.
+     *
+     * @param name The tool name
+     * @return The ToolMetadata
+     */
+    public ToolMetadata getToolMetadata(String name) {
+        return getTool(name).getMetadata();
+    }
+
+    /**
+     * Call a tool on the MCP server.
+     *
+     * @param toolName The name of the tool to call
+     * @param arguments The arguments to pass to the tool
+     * @return The result as a list of content items
+     */
+    public List<Object> callTool(String toolName, Map<String, Object> 
arguments) {
+        McpSyncClient mcpClient = getClient();
+        McpSchema.CallToolRequest request =
+                new McpSchema.CallToolRequest(
+                        toolName, arguments != null ? arguments : new 
HashMap<>());
+        McpSchema.CallToolResult result = mcpClient.callTool(request);
+
+        List<Object> content = new ArrayList<>();
+        for (var item : result.content()) {
+            content.add(MCPContentExtractor.extractContentItem(item));
+        }
+
+        return content;
+    }
+
+    /**
+     * List available prompts from the MCP server.
+     *
+     * @return List of MCPPrompt instances
+     */
+    public List<MCPPrompt> listPrompts() {
+        McpSyncClient mcpClient = getClient();
+        McpSchema.ListPromptsResult promptsResult = mcpClient.listPrompts();
+
+        List<MCPPrompt> prompts = new ArrayList<>();
+        for (McpSchema.Prompt promptData : promptsResult.prompts()) {
+            Map<String, MCPPrompt.PromptArgument> argumentsMap = new 
HashMap<>();
+            if (promptData.arguments() != null) {
+                for (var arg : promptData.arguments()) {
+                    argumentsMap.put(
+                            arg.name(),
+                            new MCPPrompt.PromptArgument(
+                                    arg.name(), arg.description(), 
arg.required()));
+                }
+            }
+
+            MCPPrompt prompt =
+                    new MCPPrompt(promptData.name(), promptData.description(), 
argumentsMap, this);
+            prompts.add(prompt);
+        }
+
+        return prompts;
+    }
+
+    /**
+     * Get a prompt by name with optional arguments.
+     *
+     * @param name The prompt name
+     * @param arguments Optional arguments for the prompt
+     * @return List of chat messages
+     */
+    public List<ChatMessage> getPrompt(String name, Map<String, Object> 
arguments) {
+        McpSyncClient mcpClient = getClient();
+        McpSchema.GetPromptRequest request =
+                new McpSchema.GetPromptRequest(
+                        name, arguments != null ? arguments : new HashMap<>());
+        McpSchema.GetPromptResult result = mcpClient.getPrompt(request);
+
+        List<ChatMessage> chatMessages = new ArrayList<>();
+        for (var message : result.messages()) {
+            if (message.content() instanceof McpSchema.TextContent) {
+                var textContent = (McpSchema.TextContent) message.content();

Review Comment:
   The idea show the variable 'textContent' can be replaced with pattern 
variable
   ```
   if (message.content() instanceof McpSchema.TextContent textContent) {
       MessageRole role = 
MessageRole.valueOf(message.role().name().toUpperCase());
       chatMessages.add(new ChatMessage(role, textContent.text()));
   }
   ```



##########
runtime/src/test/java/org/apache/flink/agents/runtime/mcp/MCPSerializationTest.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.mcp;
+
+import org.apache.flink.agents.api.mcp.MCPPrompt;
+import org.apache.flink.agents.api.mcp.MCPServer;
+import org.apache.flink.agents.api.mcp.MCPTool;
+import org.apache.flink.agents.api.tools.ToolMetadata;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnJre;
+import org.junit.jupiter.api.condition.JRE;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for MCP classes serialization with Flink's Kryo serializer.
+ *
+ * <p>This test ensures that MCP objects (MCPServer, MCPTool, MCPPrompt) can 
be properly serialized
+ * and deserialized by Flink's serialization framework, which uses Kryo under 
the hood. This is
+ * critical for distributed execution where these objects need to be sent 
across the network.
+ */
+class MCPSerializationTest {

Review Comment:
   Looks like this test doesn't depend on any runtime class, maybe we can move 
it to api module.
   
   This test looks like for testing the json serialization and deserialization 
for MCP, not Kryo serialization. The comments is confused.



##########
api/src/main/java/org/apache/flink/agents/api/agents/ReActAgent.java:
##########
@@ -91,7 +92,7 @@ public ReActAgent(
                         "Output schema must be RowTypeInfo or Pojo class.");
             }
             Prompt schemaPrompt =
-                    new Prompt(
+                    new LocalPrompt(

Review Comment:
   Maybe we can use `Prompt.fromText` here, to avoid user be aware of 
`MCPPrompt` and `LocalPrompt`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to