This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new a0df9b68 [integration] Add Amazon Bedrock chat model and embedding 
model integrations (#534)
a0df9b68 is described below

commit a0df9b68357ddf517a0f0ba68d24d2b5355a9805
Author: Avichay Marciano <[email protected]>
AuthorDate: Thu Mar 5 09:45:12 2026 +0200

    [integration] Add Amazon Bedrock chat model and embedding model 
integrations (#534)
    
    - BedrockChatModelConnection: Converse API with native tool calling, SigV4 
auth
    - BedrockChatModelSetup: model, temperature, max_tokens configuration
    - BedrockEmbeddingModelConnection: Titan Text Embeddings V2 with parallel 
batch embedding
    - BedrockEmbeddingModelSetup: model, dimensions configuration
    - Retry via unified RetryExecutor with Bedrock-specific retryable predicate
    - stripMarkdownFences for non-tool-call responses (lossless fence stripping 
only)
---
 dist/pom.xml                                       |  10 +
 integrations/{ => chat-models/bedrock}/pom.xml     |  33 +-
 .../bedrock/BedrockChatModelConnection.java        | 427 +++++++++++++++++++++
 .../chatmodels/bedrock/BedrockChatModelSetup.java  |  89 +++++
 .../bedrock/BedrockChatModelConnectionTest.java    | 124 ++++++
 .../bedrock/BedrockChatModelSetupTest.java         |  77 ++++
 integrations/chat-models/pom.xml                   |   1 +
 .../{ => embedding-models/bedrock}/pom.xml         |  33 +-
 .../bedrock/BedrockEmbeddingModelConnection.java   | 188 +++++++++
 .../bedrock/BedrockEmbeddingModelSetup.java        |  80 ++++
 .../bedrock/BedrockEmbeddingModelTest.java         |  99 +++++
 integrations/embedding-models/pom.xml              |   1 +
 integrations/pom.xml                               |   1 +
 13 files changed, 1131 insertions(+), 32 deletions(-)

diff --git a/dist/pom.xml b/dist/pom.xml
index 8199eaa3..b4e479a8 100644
--- a/dist/pom.xml
+++ b/dist/pom.xml
@@ -79,11 +79,21 @@ under the License.
             
<artifactId>flink-agents-integrations-chat-models-azureai</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-agents-integrations-chat-models-bedrock</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             
<artifactId>flink-agents-integrations-embedding-models-ollama</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-agents-integrations-embedding-models-bedrock</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             
<artifactId>flink-agents-integrations-vector-stores-elasticsearch</artifactId>
diff --git a/integrations/pom.xml b/integrations/chat-models/bedrock/pom.xml
similarity index 62%
copy from integrations/pom.xml
copy to integrations/chat-models/bedrock/pom.xml
index 0e5df222..a10b7a45 100644
--- a/integrations/pom.xml
+++ b/integrations/chat-models/bedrock/pom.xml
@@ -22,26 +22,27 @@ under the License.
 
     <parent>
         <groupId>org.apache.flink</groupId>
-        <artifactId>flink-agents</artifactId>
+        <artifactId>flink-agents-integrations-chat-models</artifactId>
         <version>0.3-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
     </parent>
 
-    <artifactId>flink-agents-integrations</artifactId>
-    <name>Flink Agents : Integrations:</name>
-    <packaging>pom</packaging>
+    <artifactId>flink-agents-integrations-chat-models-bedrock</artifactId>
+    <name>Flink Agents : Integrations: Chat Models: Bedrock</name>
+    <packaging>jar</packaging>
 
-    <properties>
-        <ollama4j.version>1.1.5</ollama4j.version>
-        <elasticsearch.version>8.19.0</elasticsearch.version>
-        <openai.version>4.8.0</openai.version>
-        <anthropic.version>2.11.1</anthropic.version>
-    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-agents-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
-    <modules>
-        <module>chat-models</module>
-        <module>embedding-models</module>
-        <module>vector-stores</module>
-        <module>mcp</module>
-    </modules>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>bedrockruntime</artifactId>
+            <version>${aws.sdk.version}</version>
+        </dependency>
+    </dependencies>
 
 </project>
diff --git 
a/integrations/chat-models/bedrock/src/main/java/org/apache/flink/agents/integrations/chatmodels/bedrock/BedrockChatModelConnection.java
 
b/integrations/chat-models/bedrock/src/main/java/org/apache/flink/agents/integrations/chatmodels/bedrock/BedrockChatModelConnection.java
new file mode 100644
index 00000000..96b7683c
--- /dev/null
+++ 
b/integrations/chat-models/bedrock/src/main/java/org/apache/flink/agents/integrations/chatmodels/bedrock/BedrockChatModelConnection.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.chatmodels.bedrock;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.RetryExecutor;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.chat.model.BaseChatModelConnection;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.tools.Tool;
+import org.apache.flink.agents.api.tools.ToolMetadata;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.core.SdkNumber;
+import software.amazon.awssdk.core.document.Document;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.bedrockruntime.BedrockRuntimeClient;
+import software.amazon.awssdk.services.bedrockruntime.model.ContentBlock;
+import software.amazon.awssdk.services.bedrockruntime.model.ConversationRole;
+import software.amazon.awssdk.services.bedrockruntime.model.ConverseRequest;
+import software.amazon.awssdk.services.bedrockruntime.model.ConverseResponse;
+import 
software.amazon.awssdk.services.bedrockruntime.model.InferenceConfiguration;
+import software.amazon.awssdk.services.bedrockruntime.model.Message;
+import software.amazon.awssdk.services.bedrockruntime.model.SystemContentBlock;
+import software.amazon.awssdk.services.bedrockruntime.model.ToolConfiguration;
+import software.amazon.awssdk.services.bedrockruntime.model.ToolInputSchema;
+import software.amazon.awssdk.services.bedrockruntime.model.ToolResultBlock;
+import 
software.amazon.awssdk.services.bedrockruntime.model.ToolResultContentBlock;
+import software.amazon.awssdk.services.bedrockruntime.model.ToolSpecification;
+import software.amazon.awssdk.services.bedrockruntime.model.ToolUseBlock;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/**
+ * Bedrock Converse API chat model connection for flink-agents.
+ *
+ * <p>Uses the Converse API which provides a unified interface across all 
Bedrock models with native
+ * tool calling support. Authentication is handled via SigV4 using the default 
AWS credentials
+ * chain.
+ *
+ * <p>Future work: support reasoning content blocks (Claude extended 
thinking), citation blocks, and
+ * image/document content blocks.
+ *
+ * <p>Supported connection parameters:
+ *
+ * <ul>
+ *   <li><b>region</b> (optional): AWS region (defaults to us-east-1)
+ *   <li><b>model</b> (optional): Default model ID (e.g. 
us.anthropic.claude-sonnet-4-20250514-v1:0)
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * @ChatModelConnection
+ * public static ResourceDescriptor bedrockConnection() {
+ *     return 
ResourceDescriptor.Builder.newBuilder(BedrockChatModelConnection.class.getName())
+ *             .addInitialArgument("region", "us-east-1")
+ *             .addInitialArgument("model", 
"us.anthropic.claude-sonnet-4-20250514-v1:0")
+ *             .build();
+ * }
+ * }</pre>
+ */
+public class BedrockChatModelConnection extends BaseChatModelConnection {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private final BedrockRuntimeClient client;
+    private final String defaultModel;
+    private final RetryExecutor retryExecutor;
+
+    public BedrockChatModelConnection(
+            ResourceDescriptor descriptor, BiFunction<String, ResourceType, 
Resource> getResource) {
+        super(descriptor, getResource);
+
+        String region = descriptor.getArgument("region");
+        if (region == null || region.isBlank()) {
+            region = "us-east-1";
+        }
+
+        this.client =
+                BedrockRuntimeClient.builder()
+                        .region(Region.of(region))
+                        
.credentialsProvider(DefaultCredentialsProvider.create())
+                        .build();
+
+        this.defaultModel = descriptor.getArgument("model");
+        Integer retries = descriptor.getArgument("max_retries");
+        this.retryExecutor =
+                RetryExecutor.builder()
+                        .maxRetries(retries != null ? retries : 5)
+                        .initialBackoffMs(200)
+                        
.retryablePredicate(BedrockChatModelConnection::isRetryable)
+                        .build();
+    }
+
+    @Override
+    public ChatMessage chat(
+            List<ChatMessage> messages, List<Tool> tools, Map<String, Object> 
arguments) {
+        String modelId = resolveModel(arguments);
+
+        List<ChatMessage> systemMsgs =
+                messages.stream()
+                        .filter(m -> m.getRole() == MessageRole.SYSTEM)
+                        .collect(Collectors.toList());
+        List<ChatMessage> conversationMsgs =
+                messages.stream()
+                        .filter(m -> m.getRole() != MessageRole.SYSTEM)
+                        .collect(Collectors.toList());
+
+        ConverseRequest.Builder requestBuilder =
+                ConverseRequest.builder()
+                        .modelId(modelId)
+                        .messages(mergeMessages(conversationMsgs));
+
+        if (!systemMsgs.isEmpty()) {
+            requestBuilder.system(
+                    systemMsgs.stream()
+                            .map(m -> 
SystemContentBlock.builder().text(m.getContent()).build())
+                            .collect(Collectors.toList()));
+        }
+
+        if (tools != null && !tools.isEmpty()) {
+            requestBuilder.toolConfig(
+                    ToolConfiguration.builder()
+                            .tools(
+                                    tools.stream()
+                                            .map(this::toBedrockTool)
+                                            .collect(Collectors.toList()))
+                            .build());
+        }
+
+        // Inference config: temperature and max_tokens
+        if (arguments != null) {
+            InferenceConfiguration.Builder inferenceBuilder = null;
+            Object temp = arguments.get("temperature");
+            if (temp instanceof Number) {
+                inferenceBuilder = InferenceConfiguration.builder();
+                inferenceBuilder.temperature(((Number) temp).floatValue());
+            }
+            Object maxTokens = arguments.get("max_tokens");
+            if (maxTokens instanceof Number) {
+                if (inferenceBuilder == null) {
+                    inferenceBuilder = InferenceConfiguration.builder();
+                }
+                inferenceBuilder.maxTokens(((Number) maxTokens).intValue());
+            }
+            if (inferenceBuilder != null) {
+                requestBuilder.inferenceConfig(inferenceBuilder.build());
+            }
+        }
+
+        ConverseRequest request = requestBuilder.build();
+
+        ConverseResponse response =
+                retryExecutor.execute(() -> client.converse(request), 
"BedrockConverse");
+
+        if (response.usage() != null) {
+            recordTokenMetrics(
+                    modelId, response.usage().inputTokens(), 
response.usage().outputTokens());
+        }
+
+        return convertResponse(response);
+    }
+
+    private static boolean isRetryable(Exception e) {
+        String msg = e.toString();
+        return msg.contains("ThrottlingException")
+                || msg.contains("ServiceUnavailableException")
+                || msg.contains("ModelErrorException")
+                || msg.contains("429")
+                || msg.contains("503");
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.client.close();
+    }
+
+    private String resolveModel(Map<String, Object> arguments) {
+        String model = arguments != null ? (String) arguments.get("model") : 
null;
+        if (model == null || model.isBlank()) {
+            model = this.defaultModel;
+        }
+        if (model == null || model.isBlank()) {
+            throw new IllegalArgumentException("No model specified for 
Bedrock.");
+        }
+        return model;
+    }
+
+    /**
+     * Merge consecutive TOOL messages into a single USER message with 
multiple toolResult content
+     * blocks, as required by Bedrock Converse API.
+     */
+    private List<Message> mergeMessages(List<ChatMessage> msgs) {
+        List<Message> result = new ArrayList<>();
+        int i = 0;
+        while (i < msgs.size()) {
+            ChatMessage msg = msgs.get(i);
+            if (msg.getRole() == MessageRole.TOOL) {
+                List<ContentBlock> toolResultBlocks = new ArrayList<>();
+                while (i < msgs.size() && msgs.get(i).getRole() == 
MessageRole.TOOL) {
+                    ChatMessage toolMsg = msgs.get(i);
+                    String toolCallId = (String) 
toolMsg.getExtraArgs().get("externalId");
+                    toolResultBlocks.add(
+                            ContentBlock.fromToolResult(
+                                    ToolResultBlock.builder()
+                                            .toolUseId(toolCallId)
+                                            .content(
+                                                    
ToolResultContentBlock.builder()
+                                                            
.text(toolMsg.getContent())
+                                                            .build())
+                                            .build()));
+                    i++;
+                }
+                result.add(
+                        Message.builder()
+                                .role(ConversationRole.USER)
+                                .content(toolResultBlocks)
+                                .build());
+            } else {
+                result.add(toBedrockMessage(msg));
+                i++;
+            }
+        }
+        return result;
+    }
+
+    private Message toBedrockMessage(ChatMessage msg) {
+        switch (msg.getRole()) {
+            case USER:
+                return Message.builder()
+                        .role(ConversationRole.USER)
+                        .content(ContentBlock.fromText(msg.getContent()))
+                        .build();
+            case ASSISTANT:
+                List<ContentBlock> blocks = new ArrayList<>();
+                if (msg.getContent() != null && !msg.getContent().isEmpty()) {
+                    blocks.add(ContentBlock.fromText(msg.getContent()));
+                }
+                if (msg.getToolCalls() != null && 
!msg.getToolCalls().isEmpty()) {
+                    for (Map<String, Object> call : msg.getToolCalls()) {
+                        @SuppressWarnings("unchecked")
+                        Map<String, Object> fn = (Map<String, Object>) 
call.get("function");
+                        String toolUseId = (String) call.get("id");
+                        String name = (String) fn.get("name");
+                        Object args = fn.get("arguments");
+                        blocks.add(
+                                ContentBlock.fromToolUse(
+                                        ToolUseBlock.builder()
+                                                .toolUseId(toolUseId)
+                                                .name(name)
+                                                .input(toDocument(args))
+                                                .build()));
+                    }
+                }
+                return 
Message.builder().role(ConversationRole.ASSISTANT).content(blocks).build();
+            case TOOL:
+                String toolCallId = (String) 
msg.getExtraArgs().get("externalId");
+                return Message.builder()
+                        .role(ConversationRole.USER)
+                        .content(
+                                ContentBlock.fromToolResult(
+                                        ToolResultBlock.builder()
+                                                .toolUseId(toolCallId)
+                                                .content(
+                                                        
ToolResultContentBlock.builder()
+                                                                
.text(msg.getContent())
+                                                                .build())
+                                                .build()))
+                        .build();
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported role for Bedrock: " + msg.getRole());
+        }
+    }
+
+    private software.amazon.awssdk.services.bedrockruntime.model.Tool 
toBedrockTool(Tool tool) {
+        ToolMetadata meta = tool.getMetadata();
+        ToolSpecification.Builder specBuilder =
+                
ToolSpecification.builder().name(meta.getName()).description(meta.getDescription());
+
+        String schema = meta.getInputSchema();
+        if (schema != null && !schema.isBlank()) {
+            try {
+                Map<String, Object> schemaMap =
+                        MAPPER.readValue(schema, new TypeReference<Map<String, 
Object>>() {});
+                
specBuilder.inputSchema(ToolInputSchema.fromJson(toDocument(schemaMap)));
+            } catch (JsonProcessingException e) {
+                throw new RuntimeException("Failed to parse tool schema.", e);
+            }
+        }
+
+        return 
software.amazon.awssdk.services.bedrockruntime.model.Tool.builder()
+                .toolSpec(specBuilder.build())
+                .build();
+    }
+
+    private ChatMessage convertResponse(ConverseResponse response) {
+        List<ContentBlock> outputBlocks = 
response.output().message().content();
+        StringBuilder textContent = new StringBuilder();
+        List<Map<String, Object>> toolCalls = new ArrayList<>();
+
+        for (ContentBlock block : outputBlocks) {
+            if (block.text() != null) {
+                textContent.append(block.text());
+            }
+            if (block.toolUse() != null) {
+                ToolUseBlock toolUse = block.toolUse();
+                Map<String, Object> callMap = new LinkedHashMap<>();
+                callMap.put("id", toolUse.toolUseId());
+                callMap.put("type", "function");
+                Map<String, Object> fnMap = new LinkedHashMap<>();
+                fnMap.put("name", toolUse.name());
+                fnMap.put("arguments", documentToMap(toolUse.input()));
+                callMap.put("function", fnMap);
+                callMap.put("original_id", toolUse.toolUseId());
+                toolCalls.add(callMap);
+            }
+        }
+
+        ChatMessage result = ChatMessage.assistant(textContent.toString());
+        if (!toolCalls.isEmpty()) {
+            result.setToolCalls(toolCalls);
+        } else {
+            // Only strip markdown fences for non-tool-call responses.
+            result = 
ChatMessage.assistant(stripMarkdownFences(textContent.toString()));
+        }
+        return result;
+    }
+
+    /**
+     * Strip markdown code fences from text responses. Some Bedrock models 
wrap JSON output in
+     * markdown fences like {@code ```json ... ```}.
+     *
+     * <p>Only strips code fences; does not extract JSON from arbitrary text, 
as that could corrupt
+     * normal prose responses containing braces.
+     */
+    static String stripMarkdownFences(String text) {
+        if (text == null) return null;
+        String trimmed = text.trim();
+        if (trimmed.startsWith("```")) {
+            int firstNewline = trimmed.indexOf('\n');
+            if (firstNewline >= 0) {
+                trimmed = trimmed.substring(firstNewline + 1);
+            }
+            if (trimmed.endsWith("```")) {
+                trimmed = trimmed.substring(0, trimmed.length() - 3).trim();
+            }
+            return trimmed;
+        }
+        return trimmed;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Document toDocument(Object obj) {
+        if (obj == null) {
+            return Document.fromNull();
+        }
+        if (obj instanceof Map) {
+            Map<String, Document> docMap = new LinkedHashMap<>();
+            ((Map<String, Object>) obj).forEach((k, v) -> docMap.put(k, 
toDocument(v)));
+            return Document.fromMap(docMap);
+        }
+        if (obj instanceof List) {
+            return Document.fromList(
+                    ((List<Object>) obj)
+                            
.stream().map(this::toDocument).collect(Collectors.toList()));
+        }
+        if (obj instanceof String) {
+            return Document.fromString((String) obj);
+        }
+        if (obj instanceof Number) {
+            return Document.fromNumber(SdkNumber.fromBigDecimal(new 
BigDecimal(obj.toString())));
+        }
+        if (obj instanceof Boolean) {
+            return Document.fromBoolean((Boolean) obj);
+        }
+        return Document.fromString(obj.toString());
+    }
+
+    private Map<String, Object> documentToMap(Document doc) {
+        if (doc == null || !doc.isMap()) {
+            return Collections.emptyMap();
+        }
+        Map<String, Object> result = new LinkedHashMap<>();
+        doc.asMap().forEach((k, v) -> result.put(k, documentToObject(v)));
+        return result;
+    }
+
+    private Object documentToObject(Document doc) {
+        if (doc == null || doc.isNull()) return null;
+        if (doc.isString()) return doc.asString();
+        if (doc.isNumber()) return doc.asNumber().bigDecimalValue();
+        if (doc.isBoolean()) return doc.asBoolean();
+        if (doc.isList()) {
+            return 
doc.asList().stream().map(this::documentToObject).collect(Collectors.toList());
+        }
+        if (doc.isMap()) return documentToMap(doc);
+        return doc.toString();
+    }
+}
diff --git 
a/integrations/chat-models/bedrock/src/main/java/org/apache/flink/agents/integrations/chatmodels/bedrock/BedrockChatModelSetup.java
 
b/integrations/chat-models/bedrock/src/main/java/org/apache/flink/agents/integrations/chatmodels/bedrock/BedrockChatModelSetup.java
new file mode 100644
index 00000000..cbcd380b
--- /dev/null
+++ 
b/integrations/chat-models/bedrock/src/main/java/org/apache/flink/agents/integrations/chatmodels/bedrock/BedrockChatModelSetup.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.chatmodels.bedrock;
+
+import org.apache.flink.agents.api.chat.model.BaseChatModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+/**
+ * Chat model setup for AWS Bedrock Converse API.
+ *
+ * <p>Supported parameters:
+ *
+ * <ul>
+ *   <li><b>connection</b> (required): name of the BedrockChatModelConnection 
resource
+ *   <li><b>model</b> (required): Bedrock model ID (e.g. 
us.anthropic.claude-sonnet-4-20250514-v1:0)
+ *   <li><b>temperature</b> (optional): sampling temperature (default 0.1)
+ *   <li><b>max_tokens</b> (optional): maximum tokens in the response
+ *   <li><b>prompt</b> (optional): prompt resource name
+ *   <li><b>tools</b> (optional): list of tool resource names
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * @ChatModelSetup
+ * public static ResourceDescriptor bedrockModel() {
+ *     return 
ResourceDescriptor.Builder.newBuilder(BedrockChatModelSetup.class.getName())
+ *             .addInitialArgument("connection", "bedrockConnection")
+ *             .addInitialArgument("model", 
"us.anthropic.claude-sonnet-4-20250514-v1:0")
+ *             .addInitialArgument("temperature", 0.1)
+ *             .addInitialArgument("max_tokens", 4096)
+ *             .build();
+ * }
+ * }</pre>
+ */
+public class BedrockChatModelSetup extends BaseChatModelSetup {
+
+    private final Double temperature;
+    private final Integer maxTokens;
+
+    public BedrockChatModelSetup(
+            ResourceDescriptor descriptor, BiFunction<String, ResourceType, 
Resource> getResource) {
+        super(descriptor, getResource);
+        this.temperature =
+                
Optional.ofNullable(descriptor.<Number>getArgument("temperature"))
+                        .map(Number::doubleValue)
+                        .orElse(0.1);
+        this.maxTokens =
+                
Optional.ofNullable(descriptor.<Number>getArgument("max_tokens"))
+                        .map(Number::intValue)
+                        .orElse(null);
+    }
+
+    @Override
+    public Map<String, Object> getParameters() {
+        Map<String, Object> params = new HashMap<>();
+        if (model != null) {
+            params.put("model", model);
+        }
+        params.put("temperature", temperature);
+        if (maxTokens != null) {
+            params.put("max_tokens", maxTokens);
+        }
+        return params;
+    }
+}
diff --git 
a/integrations/chat-models/bedrock/src/test/java/org/apache/flink/agents/integrations/chatmodels/bedrock/BedrockChatModelConnectionTest.java
 
b/integrations/chat-models/bedrock/src/test/java/org/apache/flink/agents/integrations/chatmodels/bedrock/BedrockChatModelConnectionTest.java
new file mode 100644
index 00000000..84c29481
--- /dev/null
+++ 
b/integrations/chat-models/bedrock/src/test/java/org/apache/flink/agents/integrations/chatmodels/bedrock/BedrockChatModelConnectionTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.chatmodels.bedrock;
+
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.chat.model.BaseChatModelConnection;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.*;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/** Tests for {@link BedrockChatModelConnection}. */
+class BedrockChatModelConnectionTest {
+
+    private static final BiFunction<String, ResourceType, Resource> NOOP = (a, 
b) -> null;
+
+    private static ResourceDescriptor descriptor(String region, String model) {
+        ResourceDescriptor.Builder b =
+                
ResourceDescriptor.Builder.newBuilder(BedrockChatModelConnection.class.getName());
+        if (region != null) b.addInitialArgument("region", region);
+        if (model != null) b.addInitialArgument("model", model);
+        return b.build();
+    }
+
+    @Test
+    @DisplayName("Constructor creates client with default region")
+    void testConstructorDefaultRegion() {
+        BedrockChatModelConnection conn =
+                new BedrockChatModelConnection(
+                        descriptor(null, 
"us.anthropic.claude-sonnet-4-20250514-v1:0"), NOOP);
+        assertNotNull(conn);
+    }
+
+    @Test
+    @DisplayName("Constructor creates client with explicit region")
+    void testConstructorExplicitRegion() {
+        BedrockChatModelConnection conn =
+                new BedrockChatModelConnection(
+                        descriptor("us-west-2", 
"us.anthropic.claude-sonnet-4-20250514-v1:0"),
+                        NOOP);
+        assertNotNull(conn);
+    }
+
+    @Test
+    @DisplayName("Extends BaseChatModelConnection")
+    void testInheritance() {
+        BedrockChatModelConnection conn =
+                new BedrockChatModelConnection(descriptor("us-east-1", 
"test-model"), NOOP);
+        assertThat(conn).isInstanceOf(BaseChatModelConnection.class);
+    }
+
+    @Test
+    @DisplayName("Chat throws when no model specified")
+    void testChatThrowsWithoutModel() {
+        BedrockChatModelConnection conn =
+                new BedrockChatModelConnection(descriptor("us-east-1", null), 
NOOP);
+        List<ChatMessage> msgs = List.of(new ChatMessage(MessageRole.USER, 
"hello"));
+        assertThatThrownBy(() -> conn.chat(msgs, null, Collections.emptyMap()))
+                .isInstanceOf(RuntimeException.class);
+    }
+
+    @Test
+    @DisplayName("stripMarkdownFences: normal text with braces is not 
modified")
+    void testStripMarkdownFencesPreservesTextWithBraces() {
+        assertThat(
+                        BedrockChatModelConnection.stripMarkdownFences(
+                                "Use the format {key: value} for config"))
+                .isEqualTo("Use the format {key: value} for config");
+    }
+
+    @Test
+    @DisplayName("stripMarkdownFences: clean JSON passes through")
+    void testStripMarkdownFencesCleanJson() {
+        assertThat(
+                        BedrockChatModelConnection.stripMarkdownFences(
+                                "{\"score\": 5, \"reasons\": []}"))
+                .isEqualTo("{\"score\": 5, \"reasons\": []}");
+    }
+
+    @Test
+    @DisplayName("stripMarkdownFences: strips ```json fences")
+    void testStripMarkdownFencesJsonBlock() {
+        
assertThat(BedrockChatModelConnection.stripMarkdownFences("```json\n{\"score\": 
5}\n```"))
+                .isEqualTo("{\"score\": 5}");
+    }
+
+    @Test
+    @DisplayName("stripMarkdownFences: strips plain ``` fences")
+    void testStripMarkdownFencesPlainBlock() {
+        
assertThat(BedrockChatModelConnection.stripMarkdownFences("```\n{\"id\": 
\"P001\"}\n```"))
+                .isEqualTo("{\"id\": \"P001\"}");
+    }
+
+    @Test
+    @DisplayName("stripMarkdownFences: null returns null")
+    void testStripMarkdownFencesNull() {
+        
assertThat(BedrockChatModelConnection.stripMarkdownFences(null)).isNull();
+    }
+}
diff --git 
a/integrations/chat-models/bedrock/src/test/java/org/apache/flink/agents/integrations/chatmodels/bedrock/BedrockChatModelSetupTest.java
 
b/integrations/chat-models/bedrock/src/test/java/org/apache/flink/agents/integrations/chatmodels/bedrock/BedrockChatModelSetupTest.java
new file mode 100644
index 00000000..05094f02
--- /dev/null
+++ 
b/integrations/chat-models/bedrock/src/test/java/org/apache/flink/agents/integrations/chatmodels/bedrock/BedrockChatModelSetupTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.chatmodels.bedrock;
+
+import org.apache.flink.agents.api.chat.model.BaseChatModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link BedrockChatModelSetup}. */
+class BedrockChatModelSetupTest {
+
+    private static final BiFunction<String, ResourceType, Resource> NOOP = (a, 
b) -> null;
+
+    @Test
+    @DisplayName("getParameters includes model and default temperature")
+    void testGetParametersDefaults() {
+        ResourceDescriptor desc =
+                
ResourceDescriptor.Builder.newBuilder(BedrockChatModelSetup.class.getName())
+                        .addInitialArgument("connection", "conn")
+                        .addInitialArgument("model", 
"us.anthropic.claude-sonnet-4-20250514-v1:0")
+                        .build();
+        BedrockChatModelSetup setup = new BedrockChatModelSetup(desc, NOOP);
+
+        Map<String, Object> params = setup.getParameters();
+        assertThat(params).containsEntry("model", 
"us.anthropic.claude-sonnet-4-20250514-v1:0");
+        assertThat(params).containsEntry("temperature", 0.1);
+    }
+
+    @Test
+    @DisplayName("getParameters uses custom temperature")
+    void testGetParametersCustomTemperature() {
+        ResourceDescriptor desc =
+                
ResourceDescriptor.Builder.newBuilder(BedrockChatModelSetup.class.getName())
+                        .addInitialArgument("connection", "conn")
+                        .addInitialArgument("model", "test-model")
+                        .addInitialArgument("temperature", 0.7)
+                        .build();
+        BedrockChatModelSetup setup = new BedrockChatModelSetup(desc, NOOP);
+
+        assertThat(setup.getParameters()).containsEntry("temperature", 0.7);
+    }
+
+    @Test
+    @DisplayName("Extends BaseChatModelSetup")
+    void testInheritance() {
+        ResourceDescriptor desc =
+                
ResourceDescriptor.Builder.newBuilder(BedrockChatModelSetup.class.getName())
+                        .addInitialArgument("connection", "conn")
+                        .addInitialArgument("model", "m")
+                        .build();
+        assertThat(new BedrockChatModelSetup(desc, 
NOOP)).isInstanceOf(BaseChatModelSetup.class);
+    }
+}
diff --git a/integrations/chat-models/pom.xml b/integrations/chat-models/pom.xml
index 20b1b425..e5f4b9d4 100644
--- a/integrations/chat-models/pom.xml
+++ b/integrations/chat-models/pom.xml
@@ -33,6 +33,7 @@ under the License.
     <modules>
         <module>anthropic</module>
         <module>azureai</module>
+        <module>bedrock</module>
         <module>ollama</module>
         <module>openai</module>
     </modules>
diff --git a/integrations/pom.xml 
b/integrations/embedding-models/bedrock/pom.xml
similarity index 61%
copy from integrations/pom.xml
copy to integrations/embedding-models/bedrock/pom.xml
index 0e5df222..353c32c8 100644
--- a/integrations/pom.xml
+++ b/integrations/embedding-models/bedrock/pom.xml
@@ -22,26 +22,27 @@ under the License.
 
     <parent>
         <groupId>org.apache.flink</groupId>
-        <artifactId>flink-agents</artifactId>
+        <artifactId>flink-agents-integrations-embedding-models</artifactId>
         <version>0.3-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
     </parent>
 
-    <artifactId>flink-agents-integrations</artifactId>
-    <name>Flink Agents : Integrations:</name>
-    <packaging>pom</packaging>
+    <artifactId>flink-agents-integrations-embedding-models-bedrock</artifactId>
+    <name>Flink Agents : Integrations: Embedding Models: Bedrock</name>
+    <packaging>jar</packaging>
 
-    <properties>
-        <ollama4j.version>1.1.5</ollama4j.version>
-        <elasticsearch.version>8.19.0</elasticsearch.version>
-        <openai.version>4.8.0</openai.version>
-        <anthropic.version>2.11.1</anthropic.version>
-    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-agents-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
-    <modules>
-        <module>chat-models</module>
-        <module>embedding-models</module>
-        <module>vector-stores</module>
-        <module>mcp</module>
-    </modules>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>bedrockruntime</artifactId>
+            <version>${aws.sdk.version}</version>
+        </dependency>
+    </dependencies>
 
 </project>
diff --git 
a/integrations/embedding-models/bedrock/src/main/java/org/apache/flink/agents/integrations/embeddingmodels/bedrock/BedrockEmbeddingModelConnection.java
 
b/integrations/embedding-models/bedrock/src/main/java/org/apache/flink/agents/integrations/embeddingmodels/bedrock/BedrockEmbeddingModelConnection.java
new file mode 100644
index 00000000..0d9a8783
--- /dev/null
+++ 
b/integrations/embedding-models/bedrock/src/main/java/org/apache/flink/agents/integrations/embeddingmodels/bedrock/BedrockEmbeddingModelConnection.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.embeddingmodels.bedrock;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.agents.api.RetryExecutor;
+import 
org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelConnection;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.bedrockruntime.BedrockRuntimeClient;
+import software.amazon.awssdk.services.bedrockruntime.model.InvokeModelRequest;
+import 
software.amazon.awssdk.services.bedrockruntime.model.InvokeModelResponse;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.BiFunction;
+
+/**
+ * Bedrock embedding model connection using Amazon Titan Text Embeddings V2.
+ *
+ * <p>Uses the InvokeModel API to generate embeddings. Supports configurable 
dimensions (256, 512,
+ * or 1024) and normalization. Since Titan V2 processes one text per API call, 
batch embedding is
+ * parallelized via a configurable thread pool.
+ *
+ * <p>Supported connection parameters:
+ *
+ * <ul>
+ *   <li><b>region</b> (optional): AWS region, defaults to us-east-1
+ *   <li><b>model</b> (optional): default model ID, defaults to 
amazon.titan-embed-text-v2:0
+ *   <li><b>embed_concurrency</b> (optional): thread pool size for parallel 
embedding (default: 4)
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * @EmbeddingModelConnection
+ * public static ResourceDescriptor bedrockEmbedding() {
+ *     return 
ResourceDescriptor.Builder.newBuilder(BedrockEmbeddingModelConnection.class.getName())
+ *             .addInitialArgument("region", "us-east-1")
+ *             .addInitialArgument("model", "amazon.titan-embed-text-v2:0")
+ *             .addInitialArgument("embed_concurrency", 8)
+ *             .build();
+ * }
+ * }</pre>
+ */
+public class BedrockEmbeddingModelConnection extends 
BaseEmbeddingModelConnection {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private static final String DEFAULT_MODEL = "amazon.titan-embed-text-v2:0";
+
+    private final BedrockRuntimeClient client;
+    private final String defaultModel;
+    private final ExecutorService embedPool;
+    private final RetryExecutor retryExecutor;
+
+    public BedrockEmbeddingModelConnection(
+            ResourceDescriptor descriptor, BiFunction<String, ResourceType, 
Resource> getResource) {
+        super(descriptor, getResource);
+
+        String region = descriptor.getArgument("region");
+        if (region == null || region.isBlank()) {
+            region = "us-east-1";
+        }
+
+        this.client =
+                BedrockRuntimeClient.builder()
+                        .region(Region.of(region))
+                        
.credentialsProvider(DefaultCredentialsProvider.create())
+                        .build();
+
+        String model = descriptor.getArgument("model");
+        this.defaultModel = (model != null && !model.isBlank()) ? model : 
DEFAULT_MODEL;
+
+        Integer concurrency = descriptor.getArgument("embed_concurrency");
+        int threads = concurrency != null ? concurrency : 4;
+        this.embedPool = Executors.newFixedThreadPool(threads);
+
+        Integer retries = descriptor.getArgument("max_retries");
+        this.retryExecutor =
+                RetryExecutor.builder()
+                        .maxRetries(retries != null ? retries : 5)
+                        .initialBackoffMs(200)
+                        
.retryablePredicate(BedrockEmbeddingModelConnection::isRetryable)
+                        .build();
+    }
+
+    @Override
+    public float[] embed(String text, Map<String, Object> parameters) {
+        String model = (String) parameters.getOrDefault("model", defaultModel);
+        Integer dimensions = (Integer) parameters.get("dimensions");
+
+        ObjectNode body = MAPPER.createObjectNode();
+        body.put("inputText", text);
+        if (dimensions != null) {
+            body.put("dimensions", dimensions);
+        }
+        body.put("normalize", true);
+
+        InvokeModelResponse response =
+                retryExecutor.execute(
+                        () ->
+                                client.invokeModel(
+                                        InvokeModelRequest.builder()
+                                                .modelId(model)
+                                                
.contentType("application/json")
+                                                
.body(SdkBytes.fromUtf8String(body.toString()))
+                                                .build()),
+                        "BedrockEmbed");
+
+        try {
+            JsonNode result = MAPPER.readTree(response.body().asUtf8String());
+            JsonNode embeddingNode = result.get("embedding");
+            float[] embedding = new float[embeddingNode.size()];
+            for (int i = 0; i < embeddingNode.size(); i++) {
+                embedding[i] = (float) embeddingNode.get(i).asDouble();
+            }
+            return embedding;
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to parse Bedrock embedding 
response.", e);
+        }
+    }
+
+    private static boolean isRetryable(Exception e) {
+        String msg = e.toString();
+        return msg.contains("ThrottlingException")
+                || msg.contains("ModelErrorException")
+                || msg.contains("429")
+                || msg.contains("424")
+                || msg.contains("503");
+    }
+
+    @Override
+    public List<float[]> embed(List<String> texts, Map<String, Object> 
parameters) {
+        if (texts.size() <= 1) {
+            List<float[]> results = new ArrayList<>(texts.size());
+            for (String text : texts) {
+                results.add(embed(text, parameters));
+            }
+            return results;
+        }
+        @SuppressWarnings("unchecked")
+        CompletableFuture<float[]>[] futures =
+                texts.stream()
+                        .map(
+                                text ->
+                                        CompletableFuture.supplyAsync(
+                                                () -> embed(text, parameters), 
embedPool))
+                        .toArray(CompletableFuture[]::new);
+        CompletableFuture.allOf(futures).join();
+        List<float[]> results = new ArrayList<>(texts.size());
+        for (CompletableFuture<float[]> f : futures) {
+            results.add(f.join());
+        }
+        return results;
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.embedPool.shutdown();
+        this.client.close();
+    }
+}
diff --git 
a/integrations/embedding-models/bedrock/src/main/java/org/apache/flink/agents/integrations/embeddingmodels/bedrock/BedrockEmbeddingModelSetup.java
 
b/integrations/embedding-models/bedrock/src/main/java/org/apache/flink/agents/integrations/embeddingmodels/bedrock/BedrockEmbeddingModelSetup.java
new file mode 100644
index 00000000..90dc1934
--- /dev/null
+++ 
b/integrations/embedding-models/bedrock/src/main/java/org/apache/flink/agents/integrations/embeddingmodels/bedrock/BedrockEmbeddingModelSetup.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.embeddingmodels.bedrock;
+
+import org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * Embedding model setup for Bedrock Titan Text Embeddings.
+ *
+ * <p>Supported parameters:
+ *
+ * <ul>
+ *   <li><b>connection</b> (required): name of the 
BedrockEmbeddingModelConnection resource
+ *   <li><b>model</b> (optional): model ID (default: 
amazon.titan-embed-text-v2:0)
+ *   <li><b>dimensions</b> (optional): embedding dimensions (256, 512, or 1024)
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * @EmbeddingModelSetup
+ * public static ResourceDescriptor bedrockEmbeddingSetup() {
+ *     return 
ResourceDescriptor.Builder.newBuilder(BedrockEmbeddingModelSetup.class.getName())
+ *             .addInitialArgument("connection", "bedrockEmbedding")
+ *             .addInitialArgument("model", "amazon.titan-embed-text-v2:0")
+ *             .addInitialArgument("dimensions", 1024)
+ *             .build();
+ * }
+ * }</pre>
+ */
+public class BedrockEmbeddingModelSetup extends BaseEmbeddingModelSetup {
+
+    private final Integer dimensions;
+
+    public BedrockEmbeddingModelSetup(
+            ResourceDescriptor descriptor, BiFunction<String, ResourceType, 
Resource> getResource) {
+        super(descriptor, getResource);
+        this.dimensions = descriptor.getArgument("dimensions");
+    }
+
+    @Override
+    public Map<String, Object> getParameters() {
+        Map<String, Object> params = new HashMap<>();
+        if (model != null) {
+            params.put("model", model);
+        }
+        if (dimensions != null) {
+            params.put("dimensions", dimensions);
+        }
+        return params;
+    }
+
+    @Override
+    public BedrockEmbeddingModelConnection getConnection() {
+        return (BedrockEmbeddingModelConnection) super.getConnection();
+    }
+}
diff --git 
a/integrations/embedding-models/bedrock/src/test/java/org/apache/flink/agents/integrations/embeddingmodels/bedrock/BedrockEmbeddingModelTest.java
 
b/integrations/embedding-models/bedrock/src/test/java/org/apache/flink/agents/integrations/embeddingmodels/bedrock/BedrockEmbeddingModelTest.java
new file mode 100644
index 00000000..3d2d3d07
--- /dev/null
+++ 
b/integrations/embedding-models/bedrock/src/test/java/org/apache/flink/agents/integrations/embeddingmodels/bedrock/BedrockEmbeddingModelTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.embeddingmodels.bedrock;
+
+import 
org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelConnection;
+import org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/** Tests for {@link BedrockEmbeddingModelConnection} and {@link 
BedrockEmbeddingModelSetup}. */
+class BedrockEmbeddingModelTest {
+
+    private static final BiFunction<String, ResourceType, Resource> NOOP = (a, 
b) -> null;
+
+    private static ResourceDescriptor connDescriptor(String region) {
+        ResourceDescriptor.Builder b =
+                ResourceDescriptor.Builder.newBuilder(
+                        BedrockEmbeddingModelConnection.class.getName());
+        if (region != null) b.addInitialArgument("region", region);
+        return b.build();
+    }
+
+    @Test
+    @DisplayName("Connection constructor creates client with defaults")
+    void testConnectionDefaults() {
+        BedrockEmbeddingModelConnection conn =
+                new BedrockEmbeddingModelConnection(connDescriptor(null), 
NOOP);
+        assertNotNull(conn);
+        assertThat(conn).isInstanceOf(BaseEmbeddingModelConnection.class);
+    }
+
+    @Test
+    @DisplayName("Connection constructor with explicit region and concurrency")
+    void testConnectionExplicitParams() {
+        ResourceDescriptor desc =
+                ResourceDescriptor.Builder.newBuilder(
+                                
BedrockEmbeddingModelConnection.class.getName())
+                        .addInitialArgument("region", "eu-west-1")
+                        .addInitialArgument("embed_concurrency", 8)
+                        .build();
+        BedrockEmbeddingModelConnection conn = new 
BedrockEmbeddingModelConnection(desc, NOOP);
+        assertNotNull(conn);
+    }
+
+    @Test
+    @DisplayName("Setup getParameters includes model and dimensions")
+    void testSetupParameters() {
+        ResourceDescriptor desc =
+                
ResourceDescriptor.Builder.newBuilder(BedrockEmbeddingModelSetup.class.getName())
+                        .addInitialArgument("connection", "conn")
+                        .addInitialArgument("model", 
"amazon.titan-embed-text-v2:0")
+                        .addInitialArgument("dimensions", 1024)
+                        .build();
+        BedrockEmbeddingModelSetup setup = new 
BedrockEmbeddingModelSetup(desc, NOOP);
+
+        Map<String, Object> params = setup.getParameters();
+        assertThat(params).containsEntry("model", 
"amazon.titan-embed-text-v2:0");
+        assertThat(params).containsEntry("dimensions", 1024);
+        assertThat(setup).isInstanceOf(BaseEmbeddingModelSetup.class);
+    }
+
+    @Test
+    @DisplayName("Setup getParameters omits null dimensions")
+    void testSetupParametersNoDimensions() {
+        ResourceDescriptor desc =
+                
ResourceDescriptor.Builder.newBuilder(BedrockEmbeddingModelSetup.class.getName())
+                        .addInitialArgument("connection", "conn")
+                        .addInitialArgument("model", 
"amazon.titan-embed-text-v2:0")
+                        .build();
+        BedrockEmbeddingModelSetup setup = new 
BedrockEmbeddingModelSetup(desc, NOOP);
+
+        assertThat(setup.getParameters()).doesNotContainKey("dimensions");
+    }
+}
diff --git a/integrations/embedding-models/pom.xml 
b/integrations/embedding-models/pom.xml
index f1bc6c08..1845a480 100644
--- a/integrations/embedding-models/pom.xml
+++ b/integrations/embedding-models/pom.xml
@@ -31,6 +31,7 @@ under the License.
     <packaging>pom</packaging>
 
     <modules>
+        <module>bedrock</module>
         <module>ollama</module>
     </modules>
 
diff --git a/integrations/pom.xml b/integrations/pom.xml
index 0e5df222..9989a5f0 100644
--- a/integrations/pom.xml
+++ b/integrations/pom.xml
@@ -35,6 +35,7 @@ under the License.
         <elasticsearch.version>8.19.0</elasticsearch.version>
         <openai.version>4.8.0</openai.version>
         <anthropic.version>2.11.1</anthropic.version>
+        <aws.sdk.version>2.32.16</aws.sdk.version>
     </properties>
 
     <modules>

Reply via email to