section9-lab commented on code in PR #718: URL: https://github.com/apache/flink-agents/pull/718#discussion_r3332446070
########## integrations/chat-models/gemini/src/main/java/org/apache/flink/agents/integrations/chatmodels/gemini/GeminiChatModelConnection.java: ########## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.integrations.chatmodels.gemini; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.genai.Client; +import com.google.genai.types.Candidate; +import com.google.genai.types.Content; +import com.google.genai.types.FunctionCall; +import com.google.genai.types.FunctionDeclaration; +import com.google.genai.types.GenerateContentConfig; +import com.google.genai.types.GenerateContentResponse; +import com.google.genai.types.GenerateContentResponseUsageMetadata; +import com.google.genai.types.HttpOptions; +import com.google.genai.types.Part; +import com.google.genai.types.Tool; +import org.apache.flink.agents.api.chat.messages.ChatMessage; +import org.apache.flink.agents.api.chat.messages.MessageRole; +import org.apache.flink.agents.api.chat.model.BaseChatModelConnection; +import org.apache.flink.agents.api.resource.ResourceContext; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.tools.ToolMetadata; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * A chat model integration for the Google Gemini {@code generateContent} API using the official + * google-genai Java SDK. + * + * <p>The native Gemini protocol differs from the OpenAI-compatible shape in a few places this + * module handles directly: + * + * <ul> + * <li>System messages are passed as a separate {@code systemInstruction}, not a system role. + * <li>Conversation roles are {@code user} and {@code model} (assistant maps to {@code model}). + * <li>Tool calls are returned as {@code functionCall} parts carrying a native {@code id} (there + * is no separate {@code tool_call_id}); tool results are sent back as {@code + * functionResponse} parts inside a {@code user} turn. + * </ul> + * + * <p>Supported connection parameters: + * + * <ul> + * <li><b>api_key</b> (optional): Gemini Developer API key. May be omitted when a local proxy + * injects the credential, but either {@code api_key} or {@code base_url} must be provided. + * <li><b>base_url</b> (optional): Custom endpoint, e.g. a local proxy such as {@code + * http://127.0.0.1:15721}. When set, requests are routed there instead of the default Google + * endpoint. + * <li><b>model</b> (optional): Default model name, used when no model is supplied per request. + * <li><b>timeout</b> (optional): Timeout in seconds for API requests. + * <li><b>vertex_ai</b> (optional): When true, use the Vertex AI backend together with {@code + * project} and {@code location}. + * <li><b>project</b> / <b>location</b> (optional): Vertex AI project id and location. + * </ul> + * + * <p>Example usage: + * + * <pre>{@code + * public class MyAgent extends Agent { + * @ChatModelConnection + * public static ResourceDesc gemini() { + * return ResourceDescriptor.Builder.newBuilder(GeminiChatModelConnection.class.getName()) + * .addInitialArgument("api_key", System.getenv("GEMINI_API_KEY")) + * .addInitialArgument("model", "gemini-3-pro-preview") + * .build(); + * } + * } + * }</pre> + */ +public class GeminiChatModelConnection extends BaseChatModelConnection { + + private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<>() {}; + + private final ObjectMapper mapper = new ObjectMapper(); + private final Client client; + private final String defaultModel; + + public GeminiChatModelConnection( + ResourceDescriptor descriptor, ResourceContext resourceContext) { + super(descriptor, resourceContext); + + String apiKey = descriptor.getArgument("api_key"); + String baseUrl = descriptor.getArgument("base_url"); + Boolean vertexAi = descriptor.getArgument("vertex_ai"); + + boolean useVertex = Boolean.TRUE.equals(vertexAi); + if (!useVertex + && (apiKey == null || apiKey.isBlank()) + && (baseUrl == null || baseUrl.isBlank())) { + throw new IllegalArgumentException( + "Either api_key or base_url must be provided for the Gemini connection."); + } + + Client.Builder builder = Client.builder(); + if (!useVertex) { + // The SDK requires a non-blank API key for the Gemini Developer backend. When the + // caller relies on a proxy (base_url) to inject the real credential, supply a + // placeholder so the SDK's own validation passes; the proxy overrides it on the wire. + if (apiKey != null && !apiKey.isBlank()) { + builder.apiKey(apiKey); + } else { + builder.apiKey("proxy-injected"); + } + } + + HttpOptions.Builder httpOptions = null; + if (baseUrl != null && !baseUrl.isBlank()) { + httpOptions = HttpOptions.builder().baseUrl(baseUrl); + } + Integer timeoutSeconds = descriptor.getArgument("timeout"); + if (timeoutSeconds != null && timeoutSeconds > 0) { + if (httpOptions == null) { + httpOptions = HttpOptions.builder(); + } + // HttpOptions timeout is expressed in milliseconds. + httpOptions.timeout(timeoutSeconds * 1000); + } + if (httpOptions != null) { + builder.httpOptions(httpOptions.build()); + } + + if (useVertex) { + builder.vertexAI(true); + String project = descriptor.getArgument("project"); + String location = descriptor.getArgument("location"); + if (project != null && !project.isBlank()) { + builder.project(project); + } + if (location != null && !location.isBlank()) { + builder.location(location); + } + } + + this.defaultModel = descriptor.getArgument("model"); + this.client = builder.build(); + } + + @Override + public void close() { + this.client.close(); + } + + @Override + public ChatMessage chat( + List<ChatMessage> messages, + List<org.apache.flink.agents.api.tools.Tool> tools, + Map<String, Object> arguments) { + try { + Map<String, Object> args = + arguments != null ? new HashMap<>(arguments) : new HashMap<>(); + + Object modelObj = args.remove("model"); + String modelName = modelObj != null ? modelObj.toString() : this.defaultModel; + if (modelName == null || modelName.isBlank()) { + modelName = this.defaultModel; + } + if (modelName == null || modelName.isBlank()) { + throw new IllegalArgumentException("model name must be provided for Gemini."); + } + + List<Content> contents = + messages.stream() + .filter(m -> m.getRole() != MessageRole.SYSTEM) + .map(this::convertToContent) + .collect(Collectors.toList()); + + GenerateContentConfig config = buildConfig(messages, tools, args); + + GenerateContentResponse response = + client.models.generateContent(modelName, contents, config); + ChatMessage result = convertResponse(response); + + recordUsage(result, modelName, response); + + return result; + } catch (Exception e) { + throw new RuntimeException("Failed to call Gemini generateContent API.", e); + } + } + + private GenerateContentConfig buildConfig( + List<ChatMessage> messages, + List<org.apache.flink.agents.api.tools.Tool> tools, + Map<String, Object> arguments) { + GenerateContentConfig.Builder builder = GenerateContentConfig.builder(); + + Content systemInstruction = extractSystemInstruction(messages); + if (systemInstruction != null) { + builder.systemInstruction(systemInstruction); + } + + Object temperature = arguments.remove("temperature"); + if (temperature instanceof Number) { + builder.temperature(((Number) temperature).floatValue()); + } + + Object maxOutputTokens = arguments.remove("max_output_tokens"); + if (maxOutputTokens instanceof Number) { + builder.maxOutputTokens(((Number) maxOutputTokens).intValue()); + } + + if (tools != null && !tools.isEmpty()) { + builder.tools(List.of(convertTools(tools))); + } + + return builder.build(); + } + + private Tool convertTools(List<org.apache.flink.agents.api.tools.Tool> tools) { + List<FunctionDeclaration> declarations = new ArrayList<>(tools.size()); + for (org.apache.flink.agents.api.tools.Tool tool : tools) { + ToolMetadata metadata = tool.getMetadata(); + FunctionDeclaration.Builder builder = + FunctionDeclaration.builder() + .name(metadata.getName()) + .description(metadata.getDescription()); + + String schema = metadata.getInputSchema(); + if (schema != null && !schema.isBlank()) { + builder.parametersJsonSchema(parseSchema(schema)); + } + + declarations.add(builder.build()); + } + return Tool.builder().functionDeclarations(declarations).build(); + } + + private Content extractSystemInstruction(List<ChatMessage> messages) { + List<Part> parts = + messages.stream() + .filter(m -> m.getRole() == MessageRole.SYSTEM) + .map(m -> Part.fromText(Optional.ofNullable(m.getContent()).orElse(""))) + .collect(Collectors.toList()); + if (parts.isEmpty()) { + return null; + } + return Content.builder().parts(parts).build(); + } + + // Package-visible for unit testing of the message conversion. + Content convertToContent(ChatMessage message) { + MessageRole role = message.getRole(); + String content = Optional.ofNullable(message.getContent()).orElse(""); + + switch (role) { + case USER: + return Content.builder() + .role("user") + .parts(List.of(Part.fromText(content))) + .build(); + + case ASSISTANT: + List<Part> parts = new ArrayList<>(); + if (!content.isEmpty()) { + parts.add(Part.fromText(content)); + } + List<Map<String, Object>> toolCalls = message.getToolCalls(); + if (toolCalls != null) { + for (Map<String, Object> call : toolCalls) { + parts.add(convertToolCallToPart(call)); + } + } + if (parts.isEmpty()) { + parts.add(Part.fromText("")); + } + return Content.builder().role("model").parts(parts).build(); + + case TOOL: + Object name = message.getExtraArgs().get("name"); + if (name == null) { Review Comment: Good catch — fixed in ff6da88. Implemented exactly the lookup you suggested: `chat()` now scans prior ASSISTANT turns to build a `original_id → name` map (see `buildToolCallIdToNameMap`), and the TOOL branch in `convertToContent` resolves the function name via that map keyed by the runtime-supplied `externalId`. Explicit `name` in extraArgs still takes precedence as a safety net; the error is only raised when the name genuinely can't be resolved. To your last paragraph — you weren't missing anything. The e2e I ran was driving the SDK directly with pre-seeded `name`, not through `ChatModelAction`, which is exactly why this hadn't surfaced. Apologies for the gap. ########## integrations/chat-models/gemini/src/test/java/org/apache/flink/agents/integrations/chatmodels/gemini/GeminiChatModelConnectionTest.java: ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.integrations.chatmodels.gemini; + +import com.google.genai.types.Content; +import com.google.genai.types.FunctionCall; +import com.google.genai.types.Part; +import org.apache.flink.agents.api.chat.messages.ChatMessage; +import org.apache.flink.agents.api.chat.model.BaseChatModelConnection; +import org.apache.flink.agents.api.resource.ResourceContext; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.Base64; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Unit tests for {@link GeminiChatModelConnection}. These exercise the protocol-conversion logic + * with no network access, so they run in CI without any API key. + */ +class GeminiChatModelConnectionTest { + + private static final ResourceContext NOOP = ResourceContext.fromGetResource((a, b) -> null); + + private static ResourceDescriptor descriptor(String apiKey, String baseUrl, String model) { + ResourceDescriptor.Builder b = + ResourceDescriptor.Builder.newBuilder(GeminiChatModelConnection.class.getName()); + if (apiKey != null) { + b.addInitialArgument("api_key", apiKey); + } + if (baseUrl != null) { + b.addInitialArgument("base_url", baseUrl); + } + if (model != null) { + b.addInitialArgument("model", model); + } + return b.build(); + } + + private static GeminiChatModelConnection connection() { + return new GeminiChatModelConnection( + descriptor("test-key", null, "gemini-3-pro-preview"), NOOP); + } + + @Test + @DisplayName("Constructor with api_key creates a connection") + void testConstructorWithApiKey() { + GeminiChatModelConnection conn = connection(); + assertThat(conn).isInstanceOf(BaseChatModelConnection.class); + } + + @Test + @DisplayName("Constructor with base_url (proxy) creates a connection without api_key") + void testConstructorWithBaseUrl() { + GeminiChatModelConnection conn = + new GeminiChatModelConnection( + descriptor(null, "http://127.0.0.1:15799", "gemini-3-pro-preview"), NOOP); + assertThat(conn).isInstanceOf(BaseChatModelConnection.class); + } + + @Test + @DisplayName("Constructor throws when neither api_key nor base_url is provided") + void testConstructorThrowsWithoutCredentials() { + assertThatThrownBy( + () -> + new GeminiChatModelConnection( + descriptor(null, null, "gemini-3-pro-preview"), NOOP)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("api_key or base_url"); + } + + @Test + @DisplayName("convertToContent maps USER role to a Gemini user turn") + void testConvertUserMessage() { + Content content = connection().convertToContent(ChatMessage.user("hello")); + assertThat(content.role()).hasValue("user"); + assertThat(content.parts().orElseThrow().get(0).text()).hasValue("hello"); + } + + @Test + @DisplayName("convertToContent maps ASSISTANT role to a Gemini model turn") + void testConvertAssistantMessage() { + Content content = connection().convertToContent(ChatMessage.assistant("hi there")); + assertThat(content.role()).hasValue("model"); + assertThat(content.parts().orElseThrow().get(0).text()).hasValue("hi there"); + } + + @Test + @DisplayName("convertToContent maps TOOL role to a functionResponse part") + void testConvertToolMessage() { + ChatMessage tool = ChatMessage.tool("sunny, 22C"); + tool.getExtraArgs().put("name", "get_weather"); Review Comment: Fixed in ff6da88. Removed `testConvertToolMessageWithoutName` (it was locking in the wrong contract) and added two tests in the shape you described: - `testRuntimeShapeToolMessageResolvesNameFromExternalId` — TOOL message with only `externalId` and a lookup map; asserts the resulting `functionResponse` carries the correct name. - `testRuntimeShapeMultiTurn` — full ASSISTANT (`{name, original_id}` from `convertFunctionCall`) → TOOL (only `externalId == original_id`) round-trip, asserting the recovered name on the model side. Also kept `testConvertToolMessageWithExplicitName` for the case where the caller does supply `name` directly, and added `testConvertToolMessageThrowsWhenUnresolvable` so the failure case is still locked down — but only when the name is genuinely missing. ########## integrations/chat-models/gemini/src/main/java/org/apache/flink/agents/integrations/chatmodels/gemini/GeminiChatModelConnection.java: ########## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.integrations.chatmodels.gemini; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.genai.Client; +import com.google.genai.types.Candidate; +import com.google.genai.types.Content; +import com.google.genai.types.FunctionCall; +import com.google.genai.types.FunctionDeclaration; +import com.google.genai.types.GenerateContentConfig; +import com.google.genai.types.GenerateContentResponse; +import com.google.genai.types.GenerateContentResponseUsageMetadata; +import com.google.genai.types.HttpOptions; +import com.google.genai.types.Part; +import com.google.genai.types.Tool; +import org.apache.flink.agents.api.chat.messages.ChatMessage; +import org.apache.flink.agents.api.chat.messages.MessageRole; +import org.apache.flink.agents.api.chat.model.BaseChatModelConnection; +import org.apache.flink.agents.api.resource.ResourceContext; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.tools.ToolMetadata; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * A chat model integration for the Google Gemini {@code generateContent} API using the official + * google-genai Java SDK. + * + * <p>The native Gemini protocol differs from the OpenAI-compatible shape in a few places this + * module handles directly: + * + * <ul> + * <li>System messages are passed as a separate {@code systemInstruction}, not a system role. + * <li>Conversation roles are {@code user} and {@code model} (assistant maps to {@code model}). + * <li>Tool calls are returned as {@code functionCall} parts carrying a native {@code id} (there + * is no separate {@code tool_call_id}); tool results are sent back as {@code + * functionResponse} parts inside a {@code user} turn. + * </ul> + * + * <p>Supported connection parameters: + * + * <ul> + * <li><b>api_key</b> (optional): Gemini Developer API key. May be omitted when a local proxy + * injects the credential, but either {@code api_key} or {@code base_url} must be provided. + * <li><b>base_url</b> (optional): Custom endpoint, e.g. a local proxy such as {@code + * http://127.0.0.1:15721}. When set, requests are routed there instead of the default Google + * endpoint. + * <li><b>model</b> (optional): Default model name, used when no model is supplied per request. + * <li><b>timeout</b> (optional): Timeout in seconds for API requests. + * <li><b>vertex_ai</b> (optional): When true, use the Vertex AI backend together with {@code + * project} and {@code location}. + * <li><b>project</b> / <b>location</b> (optional): Vertex AI project id and location. + * </ul> + * + * <p>Example usage: + * + * <pre>{@code + * public class MyAgent extends Agent { + * @ChatModelConnection + * public static ResourceDesc gemini() { + * return ResourceDescriptor.Builder.newBuilder(GeminiChatModelConnection.class.getName()) + * .addInitialArgument("api_key", System.getenv("GEMINI_API_KEY")) + * .addInitialArgument("model", "gemini-3-pro-preview") + * .build(); + * } + * } + * }</pre> + */ +public class GeminiChatModelConnection extends BaseChatModelConnection { + + private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<>() {}; + + private final ObjectMapper mapper = new ObjectMapper(); + private final Client client; + private final String defaultModel; + + public GeminiChatModelConnection( + ResourceDescriptor descriptor, ResourceContext resourceContext) { + super(descriptor, resourceContext); + + String apiKey = descriptor.getArgument("api_key"); + String baseUrl = descriptor.getArgument("base_url"); + Boolean vertexAi = descriptor.getArgument("vertex_ai"); + + boolean useVertex = Boolean.TRUE.equals(vertexAi); + if (!useVertex + && (apiKey == null || apiKey.isBlank()) + && (baseUrl == null || baseUrl.isBlank())) { + throw new IllegalArgumentException( + "Either api_key or base_url must be provided for the Gemini connection."); + } + + Client.Builder builder = Client.builder(); + if (!useVertex) { + // The SDK requires a non-blank API key for the Gemini Developer backend. When the + // caller relies on a proxy (base_url) to inject the real credential, supply a + // placeholder so the SDK's own validation passes; the proxy overrides it on the wire. + if (apiKey != null && !apiKey.isBlank()) { + builder.apiKey(apiKey); + } else { + builder.apiKey("proxy-injected"); + } + } + + HttpOptions.Builder httpOptions = null; + if (baseUrl != null && !baseUrl.isBlank()) { + httpOptions = HttpOptions.builder().baseUrl(baseUrl); + } + Integer timeoutSeconds = descriptor.getArgument("timeout"); + if (timeoutSeconds != null && timeoutSeconds > 0) { + if (httpOptions == null) { + httpOptions = HttpOptions.builder(); + } + // HttpOptions timeout is expressed in milliseconds. + httpOptions.timeout(timeoutSeconds * 1000); + } + if (httpOptions != null) { + builder.httpOptions(httpOptions.build()); + } + + if (useVertex) { + builder.vertexAI(true); + String project = descriptor.getArgument("project"); + String location = descriptor.getArgument("location"); + if (project != null && !project.isBlank()) { + builder.project(project); + } + if (location != null && !location.isBlank()) { + builder.location(location); + } + } + + this.defaultModel = descriptor.getArgument("model"); + this.client = builder.build(); + } + + @Override + public void close() { + this.client.close(); + } + + @Override + public ChatMessage chat( + List<ChatMessage> messages, + List<org.apache.flink.agents.api.tools.Tool> tools, + Map<String, Object> arguments) { + try { + Map<String, Object> args = + arguments != null ? new HashMap<>(arguments) : new HashMap<>(); + + Object modelObj = args.remove("model"); + String modelName = modelObj != null ? modelObj.toString() : this.defaultModel; + if (modelName == null || modelName.isBlank()) { + modelName = this.defaultModel; + } + if (modelName == null || modelName.isBlank()) { + throw new IllegalArgumentException("model name must be provided for Gemini."); + } + + List<Content> contents = + messages.stream() + .filter(m -> m.getRole() != MessageRole.SYSTEM) + .map(this::convertToContent) + .collect(Collectors.toList()); + + GenerateContentConfig config = buildConfig(messages, tools, args); + + GenerateContentResponse response = + client.models.generateContent(modelName, contents, config); + ChatMessage result = convertResponse(response); + + recordUsage(result, modelName, response); + + return result; + } catch (Exception e) { + throw new RuntimeException("Failed to call Gemini generateContent API.", e); + } + } + + private GenerateContentConfig buildConfig( + List<ChatMessage> messages, + List<org.apache.flink.agents.api.tools.Tool> tools, + Map<String, Object> arguments) { + GenerateContentConfig.Builder builder = GenerateContentConfig.builder(); + + Content systemInstruction = extractSystemInstruction(messages); + if (systemInstruction != null) { + builder.systemInstruction(systemInstruction); + } + + Object temperature = arguments.remove("temperature"); + if (temperature instanceof Number) { + builder.temperature(((Number) temperature).floatValue()); + } + + Object maxOutputTokens = arguments.remove("max_output_tokens"); Review Comment: Fixed in ff6da88. Added `applyAdditionalKwargs` mirroring the Anthropic connector: `top_k → topK`, `top_p → topP`, `stop_sequences → stopSequences`. One small wrinkle worth flagging — Gemini's `GenerateContentConfig.Builder` types both `topK` and `topP` as `Float` (verified via javap on 1.56.0), which is unusual for `topK` but matches the upstream protocol. Unknown keys are silently ignored rather than rejected, mirroring how the sibling connectors handle forward-compatible parameters. Two new tests cover the forwarding (`testApplyAdditionalKwargs`) and the ignore-unknown behavior. ########## integrations/chat-models/gemini/src/main/java/org/apache/flink/agents/integrations/chatmodels/gemini/GeminiChatModelConnection.java: ########## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.integrations.chatmodels.gemini; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.genai.Client; +import com.google.genai.types.Candidate; +import com.google.genai.types.Content; +import com.google.genai.types.FunctionCall; +import com.google.genai.types.FunctionDeclaration; +import com.google.genai.types.GenerateContentConfig; +import com.google.genai.types.GenerateContentResponse; +import com.google.genai.types.GenerateContentResponseUsageMetadata; +import com.google.genai.types.HttpOptions; +import com.google.genai.types.Part; +import com.google.genai.types.Tool; +import org.apache.flink.agents.api.chat.messages.ChatMessage; +import org.apache.flink.agents.api.chat.messages.MessageRole; +import org.apache.flink.agents.api.chat.model.BaseChatModelConnection; +import org.apache.flink.agents.api.resource.ResourceContext; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.tools.ToolMetadata; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * A chat model integration for the Google Gemini {@code generateContent} API using the official + * google-genai Java SDK. + * + * <p>The native Gemini protocol differs from the OpenAI-compatible shape in a few places this + * module handles directly: + * + * <ul> + * <li>System messages are passed as a separate {@code systemInstruction}, not a system role. + * <li>Conversation roles are {@code user} and {@code model} (assistant maps to {@code model}). + * <li>Tool calls are returned as {@code functionCall} parts carrying a native {@code id} (there + * is no separate {@code tool_call_id}); tool results are sent back as {@code + * functionResponse} parts inside a {@code user} turn. + * </ul> + * + * <p>Supported connection parameters: + * + * <ul> + * <li><b>api_key</b> (optional): Gemini Developer API key. May be omitted when a local proxy + * injects the credential, but either {@code api_key} or {@code base_url} must be provided. + * <li><b>base_url</b> (optional): Custom endpoint, e.g. a local proxy such as {@code + * http://127.0.0.1:15721}. When set, requests are routed there instead of the default Google + * endpoint. + * <li><b>model</b> (optional): Default model name, used when no model is supplied per request. + * <li><b>timeout</b> (optional): Timeout in seconds for API requests. + * <li><b>vertex_ai</b> (optional): When true, use the Vertex AI backend together with {@code + * project} and {@code location}. + * <li><b>project</b> / <b>location</b> (optional): Vertex AI project id and location. + * </ul> + * + * <p>Example usage: + * + * <pre>{@code + * public class MyAgent extends Agent { + * @ChatModelConnection + * public static ResourceDesc gemini() { + * return ResourceDescriptor.Builder.newBuilder(GeminiChatModelConnection.class.getName()) + * .addInitialArgument("api_key", System.getenv("GEMINI_API_KEY")) + * .addInitialArgument("model", "gemini-3-pro-preview") + * .build(); + * } + * } + * }</pre> + */ +public class GeminiChatModelConnection extends BaseChatModelConnection { + + private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<>() {}; + + private final ObjectMapper mapper = new ObjectMapper(); + private final Client client; + private final String defaultModel; + + public GeminiChatModelConnection( + ResourceDescriptor descriptor, ResourceContext resourceContext) { + super(descriptor, resourceContext); + + String apiKey = descriptor.getArgument("api_key"); + String baseUrl = descriptor.getArgument("base_url"); + Boolean vertexAi = descriptor.getArgument("vertex_ai"); + + boolean useVertex = Boolean.TRUE.equals(vertexAi); + if (!useVertex + && (apiKey == null || apiKey.isBlank()) + && (baseUrl == null || baseUrl.isBlank())) { + throw new IllegalArgumentException( + "Either api_key or base_url must be provided for the Gemini connection."); + } + + Client.Builder builder = Client.builder(); + if (!useVertex) { + // The SDK requires a non-blank API key for the Gemini Developer backend. When the + // caller relies on a proxy (base_url) to inject the real credential, supply a + // placeholder so the SDK's own validation passes; the proxy overrides it on the wire. + if (apiKey != null && !apiKey.isBlank()) { + builder.apiKey(apiKey); + } else { + builder.apiKey("proxy-injected"); + } + } + + HttpOptions.Builder httpOptions = null; + if (baseUrl != null && !baseUrl.isBlank()) { + httpOptions = HttpOptions.builder().baseUrl(baseUrl); + } + Integer timeoutSeconds = descriptor.getArgument("timeout"); + if (timeoutSeconds != null && timeoutSeconds > 0) { + if (httpOptions == null) { + httpOptions = HttpOptions.builder(); + } + // HttpOptions timeout is expressed in milliseconds. + httpOptions.timeout(timeoutSeconds * 1000); + } + if (httpOptions != null) { + builder.httpOptions(httpOptions.build()); + } + + if (useVertex) { + builder.vertexAI(true); + String project = descriptor.getArgument("project"); + String location = descriptor.getArgument("location"); + if (project != null && !project.isBlank()) { + builder.project(project); + } + if (location != null && !location.isBlank()) { + builder.location(location); + } + } + + this.defaultModel = descriptor.getArgument("model"); + this.client = builder.build(); + } + + @Override + public void close() { + this.client.close(); + } + + @Override + public ChatMessage chat( + List<ChatMessage> messages, + List<org.apache.flink.agents.api.tools.Tool> tools, + Map<String, Object> arguments) { + try { + Map<String, Object> args = + arguments != null ? new HashMap<>(arguments) : new HashMap<>(); + + Object modelObj = args.remove("model"); + String modelName = modelObj != null ? modelObj.toString() : this.defaultModel; + if (modelName == null || modelName.isBlank()) { + modelName = this.defaultModel; + } + if (modelName == null || modelName.isBlank()) { + throw new IllegalArgumentException("model name must be provided for Gemini."); + } + + List<Content> contents = + messages.stream() + .filter(m -> m.getRole() != MessageRole.SYSTEM) + .map(this::convertToContent) + .collect(Collectors.toList()); + + GenerateContentConfig config = buildConfig(messages, tools, args); + + GenerateContentResponse response = + client.models.generateContent(modelName, contents, config); + ChatMessage result = convertResponse(response); + + recordUsage(result, modelName, response); + + return result; + } catch (Exception e) { Review Comment: Fixed in ff6da88. Did both of the things you suggested as a belt-and-braces approach: (1) moved the `model` validation out of the `try` block so the IAE is thrown at the same lexical point as the constructor's IAE, and (2) added an explicit `catch (IllegalArgumentException) { throw; }` clause before the generic catch, so any future validation IAEs from downstream callees (e.g. in `convertToContent` when a tool-result can't be resolved) also surface unwrapped instead of being re-wrapped into a generic `RuntimeException`. ########## integrations/chat-models/gemini/src/main/java/org/apache/flink/agents/integrations/chatmodels/gemini/GeminiChatModelConnection.java: ########## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.integrations.chatmodels.gemini; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.genai.Client; +import com.google.genai.types.Candidate; +import com.google.genai.types.Content; +import com.google.genai.types.FunctionCall; +import com.google.genai.types.FunctionDeclaration; +import com.google.genai.types.GenerateContentConfig; +import com.google.genai.types.GenerateContentResponse; +import com.google.genai.types.GenerateContentResponseUsageMetadata; +import com.google.genai.types.HttpOptions; +import com.google.genai.types.Part; +import com.google.genai.types.Tool; +import org.apache.flink.agents.api.chat.messages.ChatMessage; +import org.apache.flink.agents.api.chat.messages.MessageRole; +import org.apache.flink.agents.api.chat.model.BaseChatModelConnection; +import org.apache.flink.agents.api.resource.ResourceContext; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.tools.ToolMetadata; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * A chat model integration for the Google Gemini {@code generateContent} API using the official + * google-genai Java SDK. + * + * <p>The native Gemini protocol differs from the OpenAI-compatible shape in a few places this + * module handles directly: + * + * <ul> + * <li>System messages are passed as a separate {@code systemInstruction}, not a system role. + * <li>Conversation roles are {@code user} and {@code model} (assistant maps to {@code model}). + * <li>Tool calls are returned as {@code functionCall} parts carrying a native {@code id} (there + * is no separate {@code tool_call_id}); tool results are sent back as {@code + * functionResponse} parts inside a {@code user} turn. + * </ul> + * + * <p>Supported connection parameters: + * + * <ul> + * <li><b>api_key</b> (optional): Gemini Developer API key. May be omitted when a local proxy + * injects the credential, but either {@code api_key} or {@code base_url} must be provided. + * <li><b>base_url</b> (optional): Custom endpoint, e.g. a local proxy such as {@code + * http://127.0.0.1:15721}. When set, requests are routed there instead of the default Google + * endpoint. + * <li><b>model</b> (optional): Default model name, used when no model is supplied per request. + * <li><b>timeout</b> (optional): Timeout in seconds for API requests. + * <li><b>vertex_ai</b> (optional): When true, use the Vertex AI backend together with {@code + * project} and {@code location}. + * <li><b>project</b> / <b>location</b> (optional): Vertex AI project id and location. + * </ul> + * + * <p>Example usage: + * + * <pre>{@code + * public class MyAgent extends Agent { + * @ChatModelConnection + * public static ResourceDesc gemini() { + * return ResourceDescriptor.Builder.newBuilder(GeminiChatModelConnection.class.getName()) + * .addInitialArgument("api_key", System.getenv("GEMINI_API_KEY")) + * .addInitialArgument("model", "gemini-3-pro-preview") + * .build(); + * } + * } + * }</pre> + */ +public class GeminiChatModelConnection extends BaseChatModelConnection { + + private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<>() {}; + + private final ObjectMapper mapper = new ObjectMapper(); + private final Client client; + private final String defaultModel; + + public GeminiChatModelConnection( + ResourceDescriptor descriptor, ResourceContext resourceContext) { + super(descriptor, resourceContext); + + String apiKey = descriptor.getArgument("api_key"); + String baseUrl = descriptor.getArgument("base_url"); + Boolean vertexAi = descriptor.getArgument("vertex_ai"); + + boolean useVertex = Boolean.TRUE.equals(vertexAi); + if (!useVertex + && (apiKey == null || apiKey.isBlank()) + && (baseUrl == null || baseUrl.isBlank())) { + throw new IllegalArgumentException( + "Either api_key or base_url must be provided for the Gemini connection."); + } + + Client.Builder builder = Client.builder(); + if (!useVertex) { + // The SDK requires a non-blank API key for the Gemini Developer backend. When the + // caller relies on a proxy (base_url) to inject the real credential, supply a + // placeholder so the SDK's own validation passes; the proxy overrides it on the wire. + if (apiKey != null && !apiKey.isBlank()) { + builder.apiKey(apiKey); + } else { + builder.apiKey("proxy-injected"); + } + } + + HttpOptions.Builder httpOptions = null; + if (baseUrl != null && !baseUrl.isBlank()) { + httpOptions = HttpOptions.builder().baseUrl(baseUrl); + } + Integer timeoutSeconds = descriptor.getArgument("timeout"); + if (timeoutSeconds != null && timeoutSeconds > 0) { + if (httpOptions == null) { + httpOptions = HttpOptions.builder(); + } + // HttpOptions timeout is expressed in milliseconds. + httpOptions.timeout(timeoutSeconds * 1000); + } + if (httpOptions != null) { + builder.httpOptions(httpOptions.build()); + } + + if (useVertex) { Review Comment: Took the smoke-test path in ff6da88, with a caveat. Added `testConstructorVertexAiIsWired` plus a javadoc note that the Vertex branch is wired and smoke-tested at construction but not e2e-tested. Caveat: the SDK eagerly resolves ADC at `Client.build()`, which means a CI box without ADC throws while a dev box with `gcloud auth` succeeds. The test accepts both outcomes — what it actually asserts is that the `vertex_ai` flag is **not** silently dropped (which would make the flag dead code). It's a weaker assertion than I'd like, but tightening it requires either an ADC fixture or mocking the SDK builder, both of which felt out of scope for this PR. Happy to take a stronger approach if you have a preference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
