weiqingy commented on code in PR #718:
URL: https://github.com/apache/flink-agents/pull/718#discussion_r3330876625


##########
integrations/chat-models/gemini/src/test/java/org/apache/flink/agents/integrations/chatmodels/gemini/GeminiChatModelConnectionTest.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.chatmodels.gemini;
+
+import com.google.genai.types.Content;
+import com.google.genai.types.FunctionCall;
+import com.google.genai.types.Part;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.model.BaseChatModelConnection;
+import org.apache.flink.agents.api.resource.ResourceContext;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Unit tests for {@link GeminiChatModelConnection}. These exercise the 
protocol-conversion logic
+ * with no network access, so they run in CI without any API key.
+ */
+class GeminiChatModelConnectionTest {
+
+    private static final ResourceContext NOOP = 
ResourceContext.fromGetResource((a, b) -> null);
+
+    private static ResourceDescriptor descriptor(String apiKey, String 
baseUrl, String model) {
+        ResourceDescriptor.Builder b =
+                
ResourceDescriptor.Builder.newBuilder(GeminiChatModelConnection.class.getName());
+        if (apiKey != null) {
+            b.addInitialArgument("api_key", apiKey);
+        }
+        if (baseUrl != null) {
+            b.addInitialArgument("base_url", baseUrl);
+        }
+        if (model != null) {
+            b.addInitialArgument("model", model);
+        }
+        return b.build();
+    }
+
+    private static GeminiChatModelConnection connection() {
+        return new GeminiChatModelConnection(
+                descriptor("test-key", null, "gemini-3-pro-preview"), NOOP);
+    }
+
+    @Test
+    @DisplayName("Constructor with api_key creates a connection")
+    void testConstructorWithApiKey() {
+        GeminiChatModelConnection conn = connection();
+        assertThat(conn).isInstanceOf(BaseChatModelConnection.class);
+    }
+
+    @Test
+    @DisplayName("Constructor with base_url (proxy) creates a connection 
without api_key")
+    void testConstructorWithBaseUrl() {
+        GeminiChatModelConnection conn =
+                new GeminiChatModelConnection(
+                        descriptor(null, "http://127.0.0.1:15799";, 
"gemini-3-pro-preview"), NOOP);
+        assertThat(conn).isInstanceOf(BaseChatModelConnection.class);
+    }
+
+    @Test
+    @DisplayName("Constructor throws when neither api_key nor base_url is 
provided")
+    void testConstructorThrowsWithoutCredentials() {
+        assertThatThrownBy(
+                        () ->
+                                new GeminiChatModelConnection(
+                                        descriptor(null, null, 
"gemini-3-pro-preview"), NOOP))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("api_key or base_url");
+    }
+
+    @Test
+    @DisplayName("convertToContent maps USER role to a Gemini user turn")
+    void testConvertUserMessage() {
+        Content content = 
connection().convertToContent(ChatMessage.user("hello"));
+        assertThat(content.role()).hasValue("user");
+        
assertThat(content.parts().orElseThrow().get(0).text()).hasValue("hello");
+    }
+
+    @Test
+    @DisplayName("convertToContent maps ASSISTANT role to a Gemini model turn")
+    void testConvertAssistantMessage() {
+        Content content = 
connection().convertToContent(ChatMessage.assistant("hi there"));
+        assertThat(content.role()).hasValue("model");
+        assertThat(content.parts().orElseThrow().get(0).text()).hasValue("hi 
there");
+    }
+
+    @Test
+    @DisplayName("convertToContent maps TOOL role to a functionResponse part")
+    void testConvertToolMessage() {
+        ChatMessage tool = ChatMessage.tool("sunny, 22C");
+        tool.getExtraArgs().put("name", "get_weather");

Review Comment:
   This line hand-injects `name` into `extraArgs`, but the production runtime 
never does — it supplies only `externalId`. Paired with 
`testConvertToolMessageWithoutName` (line 124) asserting the throw path, the 
suite codifies the broken behavior as the contract: both tests pass against a 
message shape the runtime never emits, so a green suite gives false confidence 
that runtime tool-calling works (this is what masks the TOOL `name` issue at 
`GeminiChatModelConnection.java:294`).
   
   Would a realistic two-turn test help pin the real contract? Something like, 
if it's useful: an ASSISTANT message whose `toolCalls` carry `{name, 
original_id}` as produced by `convertFunctionCall`, followed by a TOOL message 
carrying only `externalId` (== that `original_id`, no `name`), then assert 
`convertToContent`/`chat` produces a `functionResponse` with the correct name. 
A test in that shape would lock the runtime contract in place once the 
resolution above lands — happy to be redirected if you see a simpler way to 
cover it.



##########
integrations/chat-models/gemini/src/main/java/org/apache/flink/agents/integrations/chatmodels/gemini/GeminiChatModelConnection.java:
##########
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.integrations.chatmodels.gemini;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.genai.Client;
+import com.google.genai.types.Candidate;
+import com.google.genai.types.Content;
+import com.google.genai.types.FunctionCall;
+import com.google.genai.types.FunctionDeclaration;
+import com.google.genai.types.GenerateContentConfig;
+import com.google.genai.types.GenerateContentResponse;
+import com.google.genai.types.GenerateContentResponseUsageMetadata;
+import com.google.genai.types.HttpOptions;
+import com.google.genai.types.Part;
+import com.google.genai.types.Tool;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.chat.model.BaseChatModelConnection;
+import org.apache.flink.agents.api.resource.ResourceContext;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.tools.ToolMetadata;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * A chat model integration for the Google Gemini {@code generateContent} API 
using the official
+ * google-genai Java SDK.
+ *
+ * <p>The native Gemini protocol differs from the OpenAI-compatible shape in a 
few places this
+ * module handles directly:
+ *
+ * <ul>
+ *   <li>System messages are passed as a separate {@code systemInstruction}, 
not a system role.
+ *   <li>Conversation roles are {@code user} and {@code model} (assistant maps 
to {@code model}).
+ *   <li>Tool calls are returned as {@code functionCall} parts carrying a 
native {@code id} (there
+ *       is no separate {@code tool_call_id}); tool results are sent back as 
{@code
+ *       functionResponse} parts inside a {@code user} turn.
+ * </ul>
+ *
+ * <p>Supported connection parameters:
+ *
+ * <ul>
+ *   <li><b>api_key</b> (optional): Gemini Developer API key. May be omitted 
when a local proxy
+ *       injects the credential, but either {@code api_key} or {@code 
base_url} must be provided.
+ *   <li><b>base_url</b> (optional): Custom endpoint, e.g. a local proxy such 
as {@code
+ *       http://127.0.0.1:15721}. When set, requests are routed there instead 
of the default Google
+ *       endpoint.
+ *   <li><b>model</b> (optional): Default model name, used when no model is 
supplied per request.
+ *   <li><b>timeout</b> (optional): Timeout in seconds for API requests.
+ *   <li><b>vertex_ai</b> (optional): When true, use the Vertex AI backend 
together with {@code
+ *       project} and {@code location}.
+ *   <li><b>project</b> / <b>location</b> (optional): Vertex AI project id and 
location.
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * public class MyAgent extends Agent {
+ *   @ChatModelConnection
+ *   public static ResourceDesc gemini() {
+ *     return 
ResourceDescriptor.Builder.newBuilder(GeminiChatModelConnection.class.getName())
+ *             .addInitialArgument("api_key", System.getenv("GEMINI_API_KEY"))
+ *             .addInitialArgument("model", "gemini-3-pro-preview")
+ *             .build();
+ *   }
+ * }
+ * }</pre>
+ */
+public class GeminiChatModelConnection extends BaseChatModelConnection {
+
+    private static final TypeReference<Map<String, Object>> MAP_TYPE = new 
TypeReference<>() {};
+
+    private final ObjectMapper mapper = new ObjectMapper();
+    private final Client client;
+    private final String defaultModel;
+
+    public GeminiChatModelConnection(
+            ResourceDescriptor descriptor, ResourceContext resourceContext) {
+        super(descriptor, resourceContext);
+
+        String apiKey = descriptor.getArgument("api_key");
+        String baseUrl = descriptor.getArgument("base_url");
+        Boolean vertexAi = descriptor.getArgument("vertex_ai");
+
+        boolean useVertex = Boolean.TRUE.equals(vertexAi);
+        if (!useVertex
+                && (apiKey == null || apiKey.isBlank())
+                && (baseUrl == null || baseUrl.isBlank())) {
+            throw new IllegalArgumentException(
+                    "Either api_key or base_url must be provided for the 
Gemini connection.");
+        }
+
+        Client.Builder builder = Client.builder();
+        if (!useVertex) {
+            // The SDK requires a non-blank API key for the Gemini Developer 
backend. When the
+            // caller relies on a proxy (base_url) to inject the real 
credential, supply a
+            // placeholder so the SDK's own validation passes; the proxy 
overrides it on the wire.
+            if (apiKey != null && !apiKey.isBlank()) {
+                builder.apiKey(apiKey);
+            } else {
+                builder.apiKey("proxy-injected");
+            }
+        }
+
+        HttpOptions.Builder httpOptions = null;
+        if (baseUrl != null && !baseUrl.isBlank()) {
+            httpOptions = HttpOptions.builder().baseUrl(baseUrl);
+        }
+        Integer timeoutSeconds = descriptor.getArgument("timeout");
+        if (timeoutSeconds != null && timeoutSeconds > 0) {
+            if (httpOptions == null) {
+                httpOptions = HttpOptions.builder();
+            }
+            // HttpOptions timeout is expressed in milliseconds.
+            httpOptions.timeout(timeoutSeconds * 1000);
+        }
+        if (httpOptions != null) {
+            builder.httpOptions(httpOptions.build());
+        }
+
+        if (useVertex) {
+            builder.vertexAI(true);
+            String project = descriptor.getArgument("project");
+            String location = descriptor.getArgument("location");
+            if (project != null && !project.isBlank()) {
+                builder.project(project);
+            }
+            if (location != null && !location.isBlank()) {
+                builder.location(location);
+            }
+        }
+
+        this.defaultModel = descriptor.getArgument("model");
+        this.client = builder.build();
+    }
+
+    @Override
+    public void close() {
+        this.client.close();
+    }
+
+    @Override
+    public ChatMessage chat(
+            List<ChatMessage> messages,
+            List<org.apache.flink.agents.api.tools.Tool> tools,
+            Map<String, Object> arguments) {
+        try {
+            Map<String, Object> args =
+                    arguments != null ? new HashMap<>(arguments) : new 
HashMap<>();
+
+            Object modelObj = args.remove("model");
+            String modelName = modelObj != null ? modelObj.toString() : 
this.defaultModel;
+            if (modelName == null || modelName.isBlank()) {
+                modelName = this.defaultModel;
+            }
+            if (modelName == null || modelName.isBlank()) {
+                throw new IllegalArgumentException("model name must be 
provided for Gemini.");
+            }
+
+            List<Content> contents =
+                    messages.stream()
+                            .filter(m -> m.getRole() != MessageRole.SYSTEM)
+                            .map(this::convertToContent)
+                            .collect(Collectors.toList());
+
+            GenerateContentConfig config = buildConfig(messages, tools, args);
+
+            GenerateContentResponse response =
+                    client.models.generateContent(modelName, contents, config);
+            ChatMessage result = convertResponse(response);
+
+            recordUsage(result, modelName, response);
+
+            return result;
+        } catch (Exception e) {

Review Comment:
   The model-name validation `IllegalArgumentException` at line 182 is thrown 
inside this try, so it gets re-wrapped into a generic `RuntimeException` — 
while the constructor throws `IllegalArgumentException` directly (line 114). A 
caller catching `IllegalArgumentException` to distinguish validation errors 
will catch the constructor case but not the in-`chat` model-name case. Is there 
a reason to keep the contract asymmetric across the two entry points, or would 
validating the model name before the try (or rethrowing validation IAEs 
unwrapped) make it consistent?



##########
integrations/chat-models/gemini/src/main/java/org/apache/flink/agents/integrations/chatmodels/gemini/GeminiChatModelConnection.java:
##########
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.integrations.chatmodels.gemini;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.genai.Client;
+import com.google.genai.types.Candidate;
+import com.google.genai.types.Content;
+import com.google.genai.types.FunctionCall;
+import com.google.genai.types.FunctionDeclaration;
+import com.google.genai.types.GenerateContentConfig;
+import com.google.genai.types.GenerateContentResponse;
+import com.google.genai.types.GenerateContentResponseUsageMetadata;
+import com.google.genai.types.HttpOptions;
+import com.google.genai.types.Part;
+import com.google.genai.types.Tool;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.chat.model.BaseChatModelConnection;
+import org.apache.flink.agents.api.resource.ResourceContext;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.tools.ToolMetadata;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * A chat model integration for the Google Gemini {@code generateContent} API 
using the official
+ * google-genai Java SDK.
+ *
+ * <p>The native Gemini protocol differs from the OpenAI-compatible shape in a 
few places this
+ * module handles directly:
+ *
+ * <ul>
+ *   <li>System messages are passed as a separate {@code systemInstruction}, 
not a system role.
+ *   <li>Conversation roles are {@code user} and {@code model} (assistant maps 
to {@code model}).
+ *   <li>Tool calls are returned as {@code functionCall} parts carrying a 
native {@code id} (there
+ *       is no separate {@code tool_call_id}); tool results are sent back as 
{@code
+ *       functionResponse} parts inside a {@code user} turn.
+ * </ul>
+ *
+ * <p>Supported connection parameters:
+ *
+ * <ul>
+ *   <li><b>api_key</b> (optional): Gemini Developer API key. May be omitted 
when a local proxy
+ *       injects the credential, but either {@code api_key} or {@code 
base_url} must be provided.
+ *   <li><b>base_url</b> (optional): Custom endpoint, e.g. a local proxy such 
as {@code
+ *       http://127.0.0.1:15721}. When set, requests are routed there instead 
of the default Google
+ *       endpoint.
+ *   <li><b>model</b> (optional): Default model name, used when no model is 
supplied per request.
+ *   <li><b>timeout</b> (optional): Timeout in seconds for API requests.
+ *   <li><b>vertex_ai</b> (optional): When true, use the Vertex AI backend 
together with {@code
+ *       project} and {@code location}.
+ *   <li><b>project</b> / <b>location</b> (optional): Vertex AI project id and 
location.
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * public class MyAgent extends Agent {
+ *   @ChatModelConnection
+ *   public static ResourceDesc gemini() {
+ *     return 
ResourceDescriptor.Builder.newBuilder(GeminiChatModelConnection.class.getName())
+ *             .addInitialArgument("api_key", System.getenv("GEMINI_API_KEY"))
+ *             .addInitialArgument("model", "gemini-3-pro-preview")
+ *             .build();
+ *   }
+ * }
+ * }</pre>
+ */
+public class GeminiChatModelConnection extends BaseChatModelConnection {
+
+    private static final TypeReference<Map<String, Object>> MAP_TYPE = new 
TypeReference<>() {};
+
+    private final ObjectMapper mapper = new ObjectMapper();
+    private final Client client;
+    private final String defaultModel;
+
+    public GeminiChatModelConnection(
+            ResourceDescriptor descriptor, ResourceContext resourceContext) {
+        super(descriptor, resourceContext);
+
+        String apiKey = descriptor.getArgument("api_key");
+        String baseUrl = descriptor.getArgument("base_url");
+        Boolean vertexAi = descriptor.getArgument("vertex_ai");
+
+        boolean useVertex = Boolean.TRUE.equals(vertexAi);
+        if (!useVertex
+                && (apiKey == null || apiKey.isBlank())
+                && (baseUrl == null || baseUrl.isBlank())) {
+            throw new IllegalArgumentException(
+                    "Either api_key or base_url must be provided for the 
Gemini connection.");
+        }
+
+        Client.Builder builder = Client.builder();
+        if (!useVertex) {
+            // The SDK requires a non-blank API key for the Gemini Developer 
backend. When the
+            // caller relies on a proxy (base_url) to inject the real 
credential, supply a
+            // placeholder so the SDK's own validation passes; the proxy 
overrides it on the wire.
+            if (apiKey != null && !apiKey.isBlank()) {
+                builder.apiKey(apiKey);
+            } else {
+                builder.apiKey("proxy-injected");
+            }
+        }
+
+        HttpOptions.Builder httpOptions = null;
+        if (baseUrl != null && !baseUrl.isBlank()) {
+            httpOptions = HttpOptions.builder().baseUrl(baseUrl);
+        }
+        Integer timeoutSeconds = descriptor.getArgument("timeout");
+        if (timeoutSeconds != null && timeoutSeconds > 0) {
+            if (httpOptions == null) {
+                httpOptions = HttpOptions.builder();
+            }
+            // HttpOptions timeout is expressed in milliseconds.
+            httpOptions.timeout(timeoutSeconds * 1000);
+        }
+        if (httpOptions != null) {
+            builder.httpOptions(httpOptions.build());
+        }
+
+        if (useVertex) {
+            builder.vertexAI(true);
+            String project = descriptor.getArgument("project");
+            String location = descriptor.getArgument("location");
+            if (project != null && !project.isBlank()) {
+                builder.project(project);
+            }
+            if (location != null && !location.isBlank()) {
+                builder.location(location);
+            }
+        }
+
+        this.defaultModel = descriptor.getArgument("model");
+        this.client = builder.build();
+    }
+
+    @Override
+    public void close() {
+        this.client.close();
+    }
+
+    @Override
+    public ChatMessage chat(
+            List<ChatMessage> messages,
+            List<org.apache.flink.agents.api.tools.Tool> tools,
+            Map<String, Object> arguments) {
+        try {
+            Map<String, Object> args =
+                    arguments != null ? new HashMap<>(arguments) : new 
HashMap<>();
+
+            Object modelObj = args.remove("model");
+            String modelName = modelObj != null ? modelObj.toString() : 
this.defaultModel;
+            if (modelName == null || modelName.isBlank()) {
+                modelName = this.defaultModel;
+            }
+            if (modelName == null || modelName.isBlank()) {
+                throw new IllegalArgumentException("model name must be 
provided for Gemini.");
+            }
+
+            List<Content> contents =
+                    messages.stream()
+                            .filter(m -> m.getRole() != MessageRole.SYSTEM)
+                            .map(this::convertToContent)
+                            .collect(Collectors.toList());
+
+            GenerateContentConfig config = buildConfig(messages, tools, args);
+
+            GenerateContentResponse response =
+                    client.models.generateContent(modelName, contents, config);
+            ChatMessage result = convertResponse(response);
+
+            recordUsage(result, modelName, response);
+
+            return result;
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to call Gemini generateContent 
API.", e);
+        }
+    }
+
+    private GenerateContentConfig buildConfig(
+            List<ChatMessage> messages,
+            List<org.apache.flink.agents.api.tools.Tool> tools,
+            Map<String, Object> arguments) {
+        GenerateContentConfig.Builder builder = 
GenerateContentConfig.builder();
+
+        Content systemInstruction = extractSystemInstruction(messages);
+        if (systemInstruction != null) {
+            builder.systemInstruction(systemInstruction);
+        }
+
+        Object temperature = arguments.remove("temperature");
+        if (temperature instanceof Number) {
+            builder.temperature(((Number) temperature).floatValue());
+        }
+
+        Object maxOutputTokens = arguments.remove("max_output_tokens");

Review Comment:
   `buildConfig` reads `temperature` and `max_output_tokens`, but 
`additional_kwargs` (top_k, top_p, …) is never read here — even though 
`GeminiChatModelSetup` documents and forwards it via `getParameters()`. The net 
effect is a user who sets `top_k`/`top_p` through the documented 
`additional_kwargs` path sees them silently ignored: no error, just wrong 
sampling behavior. `AnthropicChatModelConnection.java:212-217` removes and 
applies `additional_kwargs` via `applyAdditionalKwargs`. Would mirroring that — 
reading `additional_kwargs` and mapping known keys (top_k → topK, top_p → topP) 
onto the builder — be worth doing? `GenerateContentConfig.Builder` exposes 
`topK`/`topP` in 1.56.0.



##########
integrations/chat-models/gemini/src/main/java/org/apache/flink/agents/integrations/chatmodels/gemini/GeminiChatModelConnection.java:
##########
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.integrations.chatmodels.gemini;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.genai.Client;
+import com.google.genai.types.Candidate;
+import com.google.genai.types.Content;
+import com.google.genai.types.FunctionCall;
+import com.google.genai.types.FunctionDeclaration;
+import com.google.genai.types.GenerateContentConfig;
+import com.google.genai.types.GenerateContentResponse;
+import com.google.genai.types.GenerateContentResponseUsageMetadata;
+import com.google.genai.types.HttpOptions;
+import com.google.genai.types.Part;
+import com.google.genai.types.Tool;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.chat.model.BaseChatModelConnection;
+import org.apache.flink.agents.api.resource.ResourceContext;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.tools.ToolMetadata;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * A chat model integration for the Google Gemini {@code generateContent} API 
using the official
+ * google-genai Java SDK.
+ *
+ * <p>The native Gemini protocol differs from the OpenAI-compatible shape in a 
few places this
+ * module handles directly:
+ *
+ * <ul>
+ *   <li>System messages are passed as a separate {@code systemInstruction}, 
not a system role.
+ *   <li>Conversation roles are {@code user} and {@code model} (assistant maps 
to {@code model}).
+ *   <li>Tool calls are returned as {@code functionCall} parts carrying a 
native {@code id} (there
+ *       is no separate {@code tool_call_id}); tool results are sent back as 
{@code
+ *       functionResponse} parts inside a {@code user} turn.
+ * </ul>
+ *
+ * <p>Supported connection parameters:
+ *
+ * <ul>
+ *   <li><b>api_key</b> (optional): Gemini Developer API key. May be omitted 
when a local proxy
+ *       injects the credential, but either {@code api_key} or {@code 
base_url} must be provided.
+ *   <li><b>base_url</b> (optional): Custom endpoint, e.g. a local proxy such 
as {@code
+ *       http://127.0.0.1:15721}. When set, requests are routed there instead 
of the default Google
+ *       endpoint.
+ *   <li><b>model</b> (optional): Default model name, used when no model is 
supplied per request.
+ *   <li><b>timeout</b> (optional): Timeout in seconds for API requests.
+ *   <li><b>vertex_ai</b> (optional): When true, use the Vertex AI backend 
together with {@code
+ *       project} and {@code location}.
+ *   <li><b>project</b> / <b>location</b> (optional): Vertex AI project id and 
location.
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * public class MyAgent extends Agent {
+ *   @ChatModelConnection
+ *   public static ResourceDesc gemini() {
+ *     return 
ResourceDescriptor.Builder.newBuilder(GeminiChatModelConnection.class.getName())
+ *             .addInitialArgument("api_key", System.getenv("GEMINI_API_KEY"))
+ *             .addInitialArgument("model", "gemini-3-pro-preview")
+ *             .build();
+ *   }
+ * }
+ * }</pre>
+ */
+public class GeminiChatModelConnection extends BaseChatModelConnection {
+
+    private static final TypeReference<Map<String, Object>> MAP_TYPE = new 
TypeReference<>() {};
+
+    private final ObjectMapper mapper = new ObjectMapper();
+    private final Client client;
+    private final String defaultModel;
+
+    public GeminiChatModelConnection(
+            ResourceDescriptor descriptor, ResourceContext resourceContext) {
+        super(descriptor, resourceContext);
+
+        String apiKey = descriptor.getArgument("api_key");
+        String baseUrl = descriptor.getArgument("base_url");
+        Boolean vertexAi = descriptor.getArgument("vertex_ai");
+
+        boolean useVertex = Boolean.TRUE.equals(vertexAi);
+        if (!useVertex
+                && (apiKey == null || apiKey.isBlank())
+                && (baseUrl == null || baseUrl.isBlank())) {
+            throw new IllegalArgumentException(
+                    "Either api_key or base_url must be provided for the 
Gemini connection.");
+        }
+
+        Client.Builder builder = Client.builder();
+        if (!useVertex) {
+            // The SDK requires a non-blank API key for the Gemini Developer 
backend. When the
+            // caller relies on a proxy (base_url) to inject the real 
credential, supply a
+            // placeholder so the SDK's own validation passes; the proxy 
overrides it on the wire.
+            if (apiKey != null && !apiKey.isBlank()) {
+                builder.apiKey(apiKey);
+            } else {
+                builder.apiKey("proxy-injected");
+            }
+        }
+
+        HttpOptions.Builder httpOptions = null;
+        if (baseUrl != null && !baseUrl.isBlank()) {
+            httpOptions = HttpOptions.builder().baseUrl(baseUrl);
+        }
+        Integer timeoutSeconds = descriptor.getArgument("timeout");
+        if (timeoutSeconds != null && timeoutSeconds > 0) {
+            if (httpOptions == null) {
+                httpOptions = HttpOptions.builder();
+            }
+            // HttpOptions timeout is expressed in milliseconds.
+            httpOptions.timeout(timeoutSeconds * 1000);
+        }
+        if (httpOptions != null) {
+            builder.httpOptions(httpOptions.build());
+        }
+
+        if (useVertex) {

Review Comment:
   The Vertex AI constructor branch (and the `vertex_ai`/`project`/`location` 
params the javadoc advertises) has no test, unlike the other constructor paths. 
Worth a construction smoke test if Vertex is in scope for #648, or a note that 
it's a documented-but-untested follow-up?



##########
integrations/chat-models/gemini/src/main/java/org/apache/flink/agents/integrations/chatmodels/gemini/GeminiChatModelConnection.java:
##########
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.integrations.chatmodels.gemini;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.genai.Client;
+import com.google.genai.types.Candidate;
+import com.google.genai.types.Content;
+import com.google.genai.types.FunctionCall;
+import com.google.genai.types.FunctionDeclaration;
+import com.google.genai.types.GenerateContentConfig;
+import com.google.genai.types.GenerateContentResponse;
+import com.google.genai.types.GenerateContentResponseUsageMetadata;
+import com.google.genai.types.HttpOptions;
+import com.google.genai.types.Part;
+import com.google.genai.types.Tool;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.chat.model.BaseChatModelConnection;
+import org.apache.flink.agents.api.resource.ResourceContext;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.tools.ToolMetadata;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * A chat model integration for the Google Gemini {@code generateContent} API 
using the official
+ * google-genai Java SDK.
+ *
+ * <p>The native Gemini protocol differs from the OpenAI-compatible shape in a 
few places this
+ * module handles directly:
+ *
+ * <ul>
+ *   <li>System messages are passed as a separate {@code systemInstruction}, 
not a system role.
+ *   <li>Conversation roles are {@code user} and {@code model} (assistant maps 
to {@code model}).
+ *   <li>Tool calls are returned as {@code functionCall} parts carrying a 
native {@code id} (there
+ *       is no separate {@code tool_call_id}); tool results are sent back as 
{@code
+ *       functionResponse} parts inside a {@code user} turn.
+ * </ul>
+ *
+ * <p>Supported connection parameters:
+ *
+ * <ul>
+ *   <li><b>api_key</b> (optional): Gemini Developer API key. May be omitted 
when a local proxy
+ *       injects the credential, but either {@code api_key} or {@code 
base_url} must be provided.
+ *   <li><b>base_url</b> (optional): Custom endpoint, e.g. a local proxy such 
as {@code
+ *       http://127.0.0.1:15721}. When set, requests are routed there instead 
of the default Google
+ *       endpoint.
+ *   <li><b>model</b> (optional): Default model name, used when no model is 
supplied per request.
+ *   <li><b>timeout</b> (optional): Timeout in seconds for API requests.
+ *   <li><b>vertex_ai</b> (optional): When true, use the Vertex AI backend 
together with {@code
+ *       project} and {@code location}.
+ *   <li><b>project</b> / <b>location</b> (optional): Vertex AI project id and 
location.
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * public class MyAgent extends Agent {
+ *   @ChatModelConnection
+ *   public static ResourceDesc gemini() {
+ *     return 
ResourceDescriptor.Builder.newBuilder(GeminiChatModelConnection.class.getName())
+ *             .addInitialArgument("api_key", System.getenv("GEMINI_API_KEY"))
+ *             .addInitialArgument("model", "gemini-3-pro-preview")
+ *             .build();
+ *   }
+ * }
+ * }</pre>
+ */
+public class GeminiChatModelConnection extends BaseChatModelConnection {
+
+    private static final TypeReference<Map<String, Object>> MAP_TYPE = new 
TypeReference<>() {};
+
+    private final ObjectMapper mapper = new ObjectMapper();
+    private final Client client;
+    private final String defaultModel;
+
+    public GeminiChatModelConnection(
+            ResourceDescriptor descriptor, ResourceContext resourceContext) {
+        super(descriptor, resourceContext);
+
+        String apiKey = descriptor.getArgument("api_key");
+        String baseUrl = descriptor.getArgument("base_url");
+        Boolean vertexAi = descriptor.getArgument("vertex_ai");
+
+        boolean useVertex = Boolean.TRUE.equals(vertexAi);
+        if (!useVertex
+                && (apiKey == null || apiKey.isBlank())
+                && (baseUrl == null || baseUrl.isBlank())) {
+            throw new IllegalArgumentException(
+                    "Either api_key or base_url must be provided for the 
Gemini connection.");
+        }
+
+        Client.Builder builder = Client.builder();
+        if (!useVertex) {
+            // The SDK requires a non-blank API key for the Gemini Developer 
backend. When the
+            // caller relies on a proxy (base_url) to inject the real 
credential, supply a
+            // placeholder so the SDK's own validation passes; the proxy 
overrides it on the wire.
+            if (apiKey != null && !apiKey.isBlank()) {
+                builder.apiKey(apiKey);
+            } else {
+                builder.apiKey("proxy-injected");
+            }
+        }
+
+        HttpOptions.Builder httpOptions = null;
+        if (baseUrl != null && !baseUrl.isBlank()) {
+            httpOptions = HttpOptions.builder().baseUrl(baseUrl);
+        }
+        Integer timeoutSeconds = descriptor.getArgument("timeout");
+        if (timeoutSeconds != null && timeoutSeconds > 0) {
+            if (httpOptions == null) {
+                httpOptions = HttpOptions.builder();
+            }
+            // HttpOptions timeout is expressed in milliseconds.
+            httpOptions.timeout(timeoutSeconds * 1000);
+        }
+        if (httpOptions != null) {
+            builder.httpOptions(httpOptions.build());
+        }
+
+        if (useVertex) {
+            builder.vertexAI(true);
+            String project = descriptor.getArgument("project");
+            String location = descriptor.getArgument("location");
+            if (project != null && !project.isBlank()) {
+                builder.project(project);
+            }
+            if (location != null && !location.isBlank()) {
+                builder.location(location);
+            }
+        }
+
+        this.defaultModel = descriptor.getArgument("model");
+        this.client = builder.build();
+    }
+
+    @Override
+    public void close() {
+        this.client.close();
+    }
+
+    @Override
+    public ChatMessage chat(
+            List<ChatMessage> messages,
+            List<org.apache.flink.agents.api.tools.Tool> tools,
+            Map<String, Object> arguments) {
+        try {
+            Map<String, Object> args =
+                    arguments != null ? new HashMap<>(arguments) : new 
HashMap<>();
+
+            Object modelObj = args.remove("model");
+            String modelName = modelObj != null ? modelObj.toString() : 
this.defaultModel;
+            if (modelName == null || modelName.isBlank()) {
+                modelName = this.defaultModel;
+            }
+            if (modelName == null || modelName.isBlank()) {
+                throw new IllegalArgumentException("model name must be 
provided for Gemini.");
+            }
+
+            List<Content> contents =
+                    messages.stream()
+                            .filter(m -> m.getRole() != MessageRole.SYSTEM)
+                            .map(this::convertToContent)
+                            .collect(Collectors.toList());
+
+            GenerateContentConfig config = buildConfig(messages, tools, args);
+
+            GenerateContentResponse response =
+                    client.models.generateContent(modelName, contents, config);
+            ChatMessage result = convertResponse(response);
+
+            recordUsage(result, modelName, response);
+
+            return result;
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to call Gemini generateContent 
API.", e);
+        }
+    }
+
+    private GenerateContentConfig buildConfig(
+            List<ChatMessage> messages,
+            List<org.apache.flink.agents.api.tools.Tool> tools,
+            Map<String, Object> arguments) {
+        GenerateContentConfig.Builder builder = 
GenerateContentConfig.builder();
+
+        Content systemInstruction = extractSystemInstruction(messages);
+        if (systemInstruction != null) {
+            builder.systemInstruction(systemInstruction);
+        }
+
+        Object temperature = arguments.remove("temperature");
+        if (temperature instanceof Number) {
+            builder.temperature(((Number) temperature).floatValue());
+        }
+
+        Object maxOutputTokens = arguments.remove("max_output_tokens");
+        if (maxOutputTokens instanceof Number) {
+            builder.maxOutputTokens(((Number) maxOutputTokens).intValue());
+        }
+
+        if (tools != null && !tools.isEmpty()) {
+            builder.tools(List.of(convertTools(tools)));
+        }
+
+        return builder.build();
+    }
+
+    private Tool convertTools(List<org.apache.flink.agents.api.tools.Tool> 
tools) {
+        List<FunctionDeclaration> declarations = new ArrayList<>(tools.size());
+        for (org.apache.flink.agents.api.tools.Tool tool : tools) {
+            ToolMetadata metadata = tool.getMetadata();
+            FunctionDeclaration.Builder builder =
+                    FunctionDeclaration.builder()
+                            .name(metadata.getName())
+                            .description(metadata.getDescription());
+
+            String schema = metadata.getInputSchema();
+            if (schema != null && !schema.isBlank()) {
+                builder.parametersJsonSchema(parseSchema(schema));
+            }
+
+            declarations.add(builder.build());
+        }
+        return Tool.builder().functionDeclarations(declarations).build();
+    }
+
+    private Content extractSystemInstruction(List<ChatMessage> messages) {
+        List<Part> parts =
+                messages.stream()
+                        .filter(m -> m.getRole() == MessageRole.SYSTEM)
+                        .map(m -> 
Part.fromText(Optional.ofNullable(m.getContent()).orElse("")))
+                        .collect(Collectors.toList());
+        if (parts.isEmpty()) {
+            return null;
+        }
+        return Content.builder().parts(parts).build();
+    }
+
+    // Package-visible for unit testing of the message conversion.
+    Content convertToContent(ChatMessage message) {
+        MessageRole role = message.getRole();
+        String content = Optional.ofNullable(message.getContent()).orElse("");
+
+        switch (role) {
+            case USER:
+                return Content.builder()
+                        .role("user")
+                        .parts(List.of(Part.fromText(content)))
+                        .build();
+
+            case ASSISTANT:
+                List<Part> parts = new ArrayList<>();
+                if (!content.isEmpty()) {
+                    parts.add(Part.fromText(content));
+                }
+                List<Map<String, Object>> toolCalls = message.getToolCalls();
+                if (toolCalls != null) {
+                    for (Map<String, Object> call : toolCalls) {
+                        parts.add(convertToolCallToPart(call));
+                    }
+                }
+                if (parts.isEmpty()) {
+                    parts.add(Part.fromText(""));
+                }
+                return Content.builder().role("model").parts(parts).build();
+
+            case TOOL:
+                Object name = message.getExtraArgs().get("name");
+                if (name == null) {

Review Comment:
   The TOOL branch requires `name` in `extraArgs`, but the runtime never 
supplies it. The only producer of TOOL messages is `ChatModelAction.java` 
(~line 476), which populates only `externalId` — every sibling connector reads 
`externalId` too (`AnthropicChatModelConnection.java:271`, 
`OpenAIChatCompletionsUtils.java:96`). So any multi-turn tool-calling agent 
driven through the Flink runtime throws `Tool message must have a 'name' in 
extraArgs for Gemini` the moment a tool result is replayed: single-shot chat 
and the first tool request work, but the tool-result turn fails — and function 
calling is the headline of #648.
   
   The connector already has what it needs to resolve this without the hard 
requirement: `ToolCallAction` sets `externalId == original_id`, and this 
connector writes both `name` and `original_id` onto the assistant turn's 
tool-call map in `convertFunctionCall` (lines 384, 378). `chat()` receives the 
full `messages` list including that prior ASSISTANT turn. Would building an 
`original_id → name` map from the assistant turns' `toolCalls` and looking up 
by the TOOL message's `externalId` work here? `Part.fromFunctionResponse(name, 
map)` needs the function name specifically (it exists in 1.56.0), which is why 
`externalId` alone isn't enough and the name-resolution step is needed. Keeping 
a clear error only when the name genuinely can't be resolved would preserve the 
safety net.
   
   One thing I'd love to understand: does the multi-turn tool-result e2e run 
through `ChatModelAction`, or through a harness that pre-seeds `name`? If it's 
the runtime path I'd have expected this to surface — so I'm probably missing 
something about how the e2e is wired, and would be glad to be corrected here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to