This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 35a5eefdf150 CAMEL-22622 - Camel-AWS-Bedrock: Support Converse API
(#19752)
35a5eefdf150 is described below
commit 35a5eefdf150007761d8da2fb72f92a9d1ca7740
Author: Andrea Cosentino <[email protected]>
AuthorDate: Wed Oct 29 13:23:56 2025 +0100
CAMEL-22622 - Camel-AWS-Bedrock: Support Converse API (#19752)
Signed-off-by: Andrea Cosentino <[email protected]>
---
.../camel/catalog/components/aws-bedrock.json | 14 +-
.../aws2/bedrock/runtime/aws-bedrock.json | 14 +-
.../aws2/bedrock/runtime/BedrockConstants.java | 17 ++
.../aws2/bedrock/runtime/BedrockOperations.java | 6 +-
.../aws2/bedrock/runtime/BedrockProducer.java | 239 +++++++++++++++++++++
.../runtime/stream/ConverseStreamHandler.java | 172 +++++++++++++++
.../runtime/integration/BedrockProducerIT.java | 71 ++++++
.../integration/BedrockProducerStreamingIT.java | 4 +-
.../runtime/stream/ConverseStreamHandlerTest.java | 79 +++++++
.../dsl/BedrockEndpointBuilderFactory.java | 100 +++++++++
10 files changed, 706 insertions(+), 10 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws-bedrock.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws-bedrock.json
index 4b75e1b80f4f..c392de4f2189 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws-bedrock.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws-bedrock.json
@@ -28,7 +28,7 @@
"includeStreamingMetadata": { "index": 1, "kind": "property",
"displayName": "Include Streaming Metadata", "group": "producer", "label": "",
"required": false, "type": "boolean", "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "defaultValue": true,
"configurationClass":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration",
"configurationField": "configuration", "description": "Whether to include
streaming metadata in the response hea [...]
"lazyStartProducer": { "index": 2, "kind": "property", "displayName":
"Lazy Start Producer", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Whether the producer should be started lazy (on the first message). By
starting lazy you can use this to allow CamelContext and routes to startup in
situations where a producer may otherwise fail [...]
"modelId": { "index": 3, "kind": "property", "displayName": "Model Id",
"group": "producer", "label": "", "required": true, "type": "enum", "javaType":
"java.lang.String", "enum": [ "amazon.titan-text-express-v1",
"amazon.titan-text-lite-v1", "amazon.titan-image-generator-v1",
"amazon.titan-embed-text-v1", "amazon.titan-embed-image-v1",
"amazon.titan-text-premier-v1:0", "amazon.titan-embed-text-v2:0",
"amazon.titan-image-generator-v2:0", "amazon.nova-canvas-v1:0",
"amazon.nova-lite-v [...]
- "operation": { "index": 4, "kind": "property", "displayName": "Operation",
"group": "producer", "label": "", "required": true, "type": "enum", "javaType":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockOperations", "enum": [
"invokeTextModel", "invokeImageModel", "invokeEmbeddingsModel",
"invokeTextModelStreaming", "invokeImageModelStreaming",
"invokeEmbeddingsModelStreaming" ], "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "configurationCl [...]
+ "operation": { "index": 4, "kind": "property", "displayName": "Operation",
"group": "producer", "label": "", "required": true, "type": "enum", "javaType":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockOperations", "enum": [
"invokeTextModel", "invokeImageModel", "invokeEmbeddingsModel",
"invokeTextModelStreaming", "invokeImageModelStreaming",
"invokeEmbeddingsModelStreaming", "converse", "converseStream" ], "deprecated":
false, "deprecationNote": "", "autowired": false, "se [...]
"overrideEndpoint": { "index": 5, "kind": "property", "displayName":
"Override Endpoint", "group": "producer", "label": "", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration",
"configurationField": "configuration", "description": "Set the need for
overriding the endpoint. This option needs to be used in [...]
"pojoRequest": { "index": 6, "kind": "property", "displayName": "Pojo
Request", "group": "producer", "label": "", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration",
"configurationField": "configuration", "description": "If we want to use a POJO
request as body or not" },
"profileCredentialsName": { "index": 7, "kind": "property", "displayName":
"Profile Credentials Name", "group": "producer", "label": "", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "false",
"configurationClass":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration",
"configurationField": "configuration", "description": "If using a profile
credentials provider, this para [...]
@@ -58,13 +58,21 @@
"CamelAwsBedrockStreamOutputMode": { "index": 3, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The streaming output mode (complete or
chunks)", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAM_OUTPUT_MODE"
},
"CamelAwsBedrockCompletionReason": { "index": 4, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The completion reason for streaming
response", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAMING_COMPLETION_REASON"
},
"CamelAwsBedrockTokenCount": { "index": 5, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The number of tokens generated in
streaming response", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAMING_TOKEN_COUNT"
},
- "CamelAwsBedrockChunkCount": { "index": 6, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The number of chunks received in
streaming response", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAMING_CHUNK_COUNT"
}
+ "CamelAwsBedrockChunkCount": { "index": 6, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The number of chunks received in
streaming response", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAMING_CHUNK_COUNT"
},
+ "CamelAwsBedrockConverseMessages": { "index": 7, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "List<Message>", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The conversation messages
for Converse API", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_MESSAGES"
},
+ "CamelAwsBedrockConverseSystem": { "index": 8, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "List<SystemContentBlock>", "deprecated": false, "deprecationNote":
"", "autowired": false, "secret": false, "description": "The system prompts for
Converse API", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_SYSTEM"
},
+ "CamelAwsBedrockConverseInferenceConfig": { "index": 9, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "InferenceConfiguration", "deprecated": false, "deprecationNote":
"", "autowired": false, "secret": false, "description": "The inference
configuration for Converse API", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_INFERENCE_CONFIG"
},
+ "CamelAwsBedrockConverseToolConfig": { "index": 10, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "ToolConfiguration", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The tool configuration for
Converse API", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_TOOL_CONFIG"
},
+ "CamelAwsBedrockConverseAdditionalFields": { "index": 11, "kind":
"header", "displayName": "", "group": "producer", "label": "", "required":
false, "javaType": "software.amazon.awssdk.core.document.Document",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The additional model request fields for Converse API",
"constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_ADDITIONAL_MODEL_REQUEST_FIELDS"
},
+ "CamelAwsBedrockConverseStopReason": { "index": 12, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The stop reason from Converse API
response", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_STOP_REASON"
},
+ "CamelAwsBedrockConverseUsage": { "index": 13, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "TokenUsage", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The usage metrics from
Converse API response", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_USAGE"
},
+ "CamelAwsBedrockConverseOutputMessage": { "index": 14, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "Message", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The output message from Converse API
response", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_OUTPUT_MESSAGE"
}
},
"properties": {
"label": { "index": 0, "kind": "path", "displayName": "Label", "group":
"producer", "label": "", "required": true, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration",
"configurationField": "configuration", "description": "Logical name" },
"includeStreamingMetadata": { "index": 1, "kind": "parameter",
"displayName": "Include Streaming Metadata", "group": "producer", "label": "",
"required": false, "type": "boolean", "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "defaultValue": true,
"configurationClass":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration",
"configurationField": "configuration", "description": "Whether to include
streaming metadata in the response he [...]
"modelId": { "index": 2, "kind": "parameter", "displayName": "Model Id",
"group": "producer", "label": "", "required": true, "type": "enum", "javaType":
"java.lang.String", "enum": [ "amazon.titan-text-express-v1",
"amazon.titan-text-lite-v1", "amazon.titan-image-generator-v1",
"amazon.titan-embed-text-v1", "amazon.titan-embed-image-v1",
"amazon.titan-text-premier-v1:0", "amazon.titan-embed-text-v2:0",
"amazon.titan-image-generator-v2:0", "amazon.nova-canvas-v1:0",
"amazon.nova-lite- [...]
- "operation": { "index": 3, "kind": "parameter", "displayName":
"Operation", "group": "producer", "label": "", "required": true, "type":
"enum", "javaType":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockOperations", "enum": [
"invokeTextModel", "invokeImageModel", "invokeEmbeddingsModel",
"invokeTextModelStreaming", "invokeImageModelStreaming",
"invokeEmbeddingsModelStreaming" ], "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "configurationC [...]
+ "operation": { "index": 3, "kind": "parameter", "displayName":
"Operation", "group": "producer", "label": "", "required": true, "type":
"enum", "javaType":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockOperations", "enum": [
"invokeTextModel", "invokeImageModel", "invokeEmbeddingsModel",
"invokeTextModelStreaming", "invokeImageModelStreaming",
"invokeEmbeddingsModelStreaming", "converse", "converseStream" ], "deprecated":
false, "deprecationNote": "", "autowired": false, "s [...]
"overrideEndpoint": { "index": 4, "kind": "parameter", "displayName":
"Override Endpoint", "group": "producer", "label": "", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration",
"configurationField": "configuration", "description": "Set the need for
overriding the endpoint. This option needs to be used i [...]
"pojoRequest": { "index": 5, "kind": "parameter", "displayName": "Pojo
Request", "group": "producer", "label": "", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration",
"configurationField": "configuration", "description": "If we want to use a POJO
request as body or not" },
"profileCredentialsName": { "index": 6, "kind": "parameter",
"displayName": "Profile Credentials Name", "group": "producer", "label": "",
"required": false, "type": "string", "javaType": "java.lang.String",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"false", "configurationClass":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration",
"configurationField": "configuration", "description": "If using a profile
credentials provider, this par [...]
diff --git
a/components/camel-aws/camel-aws-bedrock/src/generated/resources/META-INF/org/apache/camel/component/aws2/bedrock/runtime/aws-bedrock.json
b/components/camel-aws/camel-aws-bedrock/src/generated/resources/META-INF/org/apache/camel/component/aws2/bedrock/runtime/aws-bedrock.json
index 4b75e1b80f4f..c392de4f2189 100644
---
a/components/camel-aws/camel-aws-bedrock/src/generated/resources/META-INF/org/apache/camel/component/aws2/bedrock/runtime/aws-bedrock.json
+++
b/components/camel-aws/camel-aws-bedrock/src/generated/resources/META-INF/org/apache/camel/component/aws2/bedrock/runtime/aws-bedrock.json
@@ -28,7 +28,7 @@
"includeStreamingMetadata": { "index": 1, "kind": "property",
"displayName": "Include Streaming Metadata", "group": "producer", "label": "",
"required": false, "type": "boolean", "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "defaultValue": true,
"configurationClass":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration",
"configurationField": "configuration", "description": "Whether to include
streaming metadata in the response hea [...]
"lazyStartProducer": { "index": 2, "kind": "property", "displayName":
"Lazy Start Producer", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Whether the producer should be started lazy (on the first message). By
starting lazy you can use this to allow CamelContext and routes to startup in
situations where a producer may otherwise fail [...]
"modelId": { "index": 3, "kind": "property", "displayName": "Model Id",
"group": "producer", "label": "", "required": true, "type": "enum", "javaType":
"java.lang.String", "enum": [ "amazon.titan-text-express-v1",
"amazon.titan-text-lite-v1", "amazon.titan-image-generator-v1",
"amazon.titan-embed-text-v1", "amazon.titan-embed-image-v1",
"amazon.titan-text-premier-v1:0", "amazon.titan-embed-text-v2:0",
"amazon.titan-image-generator-v2:0", "amazon.nova-canvas-v1:0",
"amazon.nova-lite-v [...]
- "operation": { "index": 4, "kind": "property", "displayName": "Operation",
"group": "producer", "label": "", "required": true, "type": "enum", "javaType":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockOperations", "enum": [
"invokeTextModel", "invokeImageModel", "invokeEmbeddingsModel",
"invokeTextModelStreaming", "invokeImageModelStreaming",
"invokeEmbeddingsModelStreaming" ], "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "configurationCl [...]
+ "operation": { "index": 4, "kind": "property", "displayName": "Operation",
"group": "producer", "label": "", "required": true, "type": "enum", "javaType":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockOperations", "enum": [
"invokeTextModel", "invokeImageModel", "invokeEmbeddingsModel",
"invokeTextModelStreaming", "invokeImageModelStreaming",
"invokeEmbeddingsModelStreaming", "converse", "converseStream" ], "deprecated":
false, "deprecationNote": "", "autowired": false, "se [...]
"overrideEndpoint": { "index": 5, "kind": "property", "displayName":
"Override Endpoint", "group": "producer", "label": "", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration",
"configurationField": "configuration", "description": "Set the need for
overriding the endpoint. This option needs to be used in [...]
"pojoRequest": { "index": 6, "kind": "property", "displayName": "Pojo
Request", "group": "producer", "label": "", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration",
"configurationField": "configuration", "description": "If we want to use a POJO
request as body or not" },
"profileCredentialsName": { "index": 7, "kind": "property", "displayName":
"Profile Credentials Name", "group": "producer", "label": "", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "false",
"configurationClass":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration",
"configurationField": "configuration", "description": "If using a profile
credentials provider, this para [...]
@@ -58,13 +58,21 @@
"CamelAwsBedrockStreamOutputMode": { "index": 3, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The streaming output mode (complete or
chunks)", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAM_OUTPUT_MODE"
},
"CamelAwsBedrockCompletionReason": { "index": 4, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The completion reason for streaming
response", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAMING_COMPLETION_REASON"
},
"CamelAwsBedrockTokenCount": { "index": 5, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The number of tokens generated in
streaming response", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAMING_TOKEN_COUNT"
},
- "CamelAwsBedrockChunkCount": { "index": 6, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The number of chunks received in
streaming response", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAMING_CHUNK_COUNT"
}
+ "CamelAwsBedrockChunkCount": { "index": 6, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The number of chunks received in
streaming response", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAMING_CHUNK_COUNT"
},
+ "CamelAwsBedrockConverseMessages": { "index": 7, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "List<Message>", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The conversation messages
for Converse API", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_MESSAGES"
},
+ "CamelAwsBedrockConverseSystem": { "index": 8, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "List<SystemContentBlock>", "deprecated": false, "deprecationNote":
"", "autowired": false, "secret": false, "description": "The system prompts for
Converse API", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_SYSTEM"
},
+ "CamelAwsBedrockConverseInferenceConfig": { "index": 9, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "InferenceConfiguration", "deprecated": false, "deprecationNote":
"", "autowired": false, "secret": false, "description": "The inference
configuration for Converse API", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_INFERENCE_CONFIG"
},
+ "CamelAwsBedrockConverseToolConfig": { "index": 10, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "ToolConfiguration", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The tool configuration for
Converse API", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_TOOL_CONFIG"
},
+ "CamelAwsBedrockConverseAdditionalFields": { "index": 11, "kind":
"header", "displayName": "", "group": "producer", "label": "", "required":
false, "javaType": "software.amazon.awssdk.core.document.Document",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The additional model request fields for Converse API",
"constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_ADDITIONAL_MODEL_REQUEST_FIELDS"
},
+ "CamelAwsBedrockConverseStopReason": { "index": 12, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The stop reason from Converse API
response", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_STOP_REASON"
},
+ "CamelAwsBedrockConverseUsage": { "index": 13, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "TokenUsage", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The usage metrics from
Converse API response", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_USAGE"
},
+ "CamelAwsBedrockConverseOutputMessage": { "index": 14, "kind": "header",
"displayName": "", "group": "producer", "label": "", "required": false,
"javaType": "Message", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The output message from Converse API
response", "constantName":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_OUTPUT_MESSAGE"
}
},
"properties": {
"label": { "index": 0, "kind": "path", "displayName": "Label", "group":
"producer", "label": "", "required": true, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration",
"configurationField": "configuration", "description": "Logical name" },
"includeStreamingMetadata": { "index": 1, "kind": "parameter",
"displayName": "Include Streaming Metadata", "group": "producer", "label": "",
"required": false, "type": "boolean", "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "defaultValue": true,
"configurationClass":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration",
"configurationField": "configuration", "description": "Whether to include
streaming metadata in the response he [...]
"modelId": { "index": 2, "kind": "parameter", "displayName": "Model Id",
"group": "producer", "label": "", "required": true, "type": "enum", "javaType":
"java.lang.String", "enum": [ "amazon.titan-text-express-v1",
"amazon.titan-text-lite-v1", "amazon.titan-image-generator-v1",
"amazon.titan-embed-text-v1", "amazon.titan-embed-image-v1",
"amazon.titan-text-premier-v1:0", "amazon.titan-embed-text-v2:0",
"amazon.titan-image-generator-v2:0", "amazon.nova-canvas-v1:0",
"amazon.nova-lite- [...]
- "operation": { "index": 3, "kind": "parameter", "displayName":
"Operation", "group": "producer", "label": "", "required": true, "type":
"enum", "javaType":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockOperations", "enum": [
"invokeTextModel", "invokeImageModel", "invokeEmbeddingsModel",
"invokeTextModelStreaming", "invokeImageModelStreaming",
"invokeEmbeddingsModelStreaming" ], "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "configurationC [...]
+ "operation": { "index": 3, "kind": "parameter", "displayName":
"Operation", "group": "producer", "label": "", "required": true, "type":
"enum", "javaType":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockOperations", "enum": [
"invokeTextModel", "invokeImageModel", "invokeEmbeddingsModel",
"invokeTextModelStreaming", "invokeImageModelStreaming",
"invokeEmbeddingsModelStreaming", "converse", "converseStream" ], "deprecated":
false, "deprecationNote": "", "autowired": false, "s [...]
"overrideEndpoint": { "index": 4, "kind": "parameter", "displayName":
"Override Endpoint", "group": "producer", "label": "", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration",
"configurationField": "configuration", "description": "Set the need for
overriding the endpoint. This option needs to be used i [...]
"pojoRequest": { "index": 5, "kind": "parameter", "displayName": "Pojo
Request", "group": "producer", "label": "", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration",
"configurationField": "configuration", "description": "If we want to use a POJO
request as body or not" },
"profileCredentialsName": { "index": 6, "kind": "parameter",
"displayName": "Profile Credentials Name", "group": "producer", "label": "",
"required": false, "type": "string", "javaType": "java.lang.String",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"false", "configurationClass":
"org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration",
"configurationField": "configuration", "description": "If using a profile
credentials provider, this par [...]
diff --git
a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockConstants.java
b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockConstants.java
index f0df52b81a4b..09929fe76515 100644
---
a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockConstants.java
+++
b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockConstants.java
@@ -36,4 +36,21 @@ public interface BedrockConstants {
String STREAMING_TOKEN_COUNT = "CamelAwsBedrockTokenCount";
@Metadata(description = "The number of chunks received in streaming
response", javaType = "Integer")
String STREAMING_CHUNK_COUNT = "CamelAwsBedrockChunkCount";
+ @Metadata(description = "The conversation messages for Converse API",
javaType = "List<Message>")
+ String CONVERSE_MESSAGES = "CamelAwsBedrockConverseMessages";
+ @Metadata(description = "The system prompts for Converse API", javaType =
"List<SystemContentBlock>")
+ String CONVERSE_SYSTEM = "CamelAwsBedrockConverseSystem";
+ @Metadata(description = "The inference configuration for Converse API",
javaType = "InferenceConfiguration")
+ String CONVERSE_INFERENCE_CONFIG =
"CamelAwsBedrockConverseInferenceConfig";
+ @Metadata(description = "The tool configuration for Converse API",
javaType = "ToolConfiguration")
+ String CONVERSE_TOOL_CONFIG = "CamelAwsBedrockConverseToolConfig";
+ @Metadata(description = "The additional model request fields for Converse
API",
+ javaType = "software.amazon.awssdk.core.document.Document")
+ String CONVERSE_ADDITIONAL_MODEL_REQUEST_FIELDS =
"CamelAwsBedrockConverseAdditionalFields";
+ @Metadata(description = "The stop reason from Converse API response",
javaType = "String")
+ String CONVERSE_STOP_REASON = "CamelAwsBedrockConverseStopReason";
+ @Metadata(description = "The usage metrics from Converse API response",
javaType = "TokenUsage")
+ String CONVERSE_USAGE = "CamelAwsBedrockConverseUsage";
+ @Metadata(description = "The output message from Converse API response",
javaType = "Message")
+ String CONVERSE_OUTPUT_MESSAGE = "CamelAwsBedrockConverseOutputMessage";
}
diff --git
a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockOperations.java
b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockOperations.java
index b541f6e4c206..079c82fd0497 100644
---
a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockOperations.java
+++
b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockOperations.java
@@ -28,5 +28,9 @@ public enum BedrockOperations {
invokeImageModelStreaming,
- invokeEmbeddingsModelStreaming
+ invokeEmbeddingsModelStreaming,
+
+ converse,
+
+ converseStream
}
diff --git
a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockProducer.java
b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockProducer.java
index 7ee309ea8983..001ed6bc0c4e 100644
---
a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockProducer.java
+++
b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockProducer.java
@@ -35,9 +35,16 @@ import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.bedrockruntime.BedrockRuntimeClient;
+import software.amazon.awssdk.services.bedrockruntime.model.ContentBlock;
+import software.amazon.awssdk.services.bedrockruntime.model.ConverseRequest;
+import software.amazon.awssdk.services.bedrockruntime.model.ConverseResponse;
+import
software.amazon.awssdk.services.bedrockruntime.model.ConverseStreamRequest;
+import
software.amazon.awssdk.services.bedrockruntime.model.InferenceConfiguration;
import software.amazon.awssdk.services.bedrockruntime.model.InvokeModelRequest;
import
software.amazon.awssdk.services.bedrockruntime.model.InvokeModelResponse;
import
software.amazon.awssdk.services.bedrockruntime.model.InvokeModelWithResponseStreamRequest;
+import software.amazon.awssdk.services.bedrockruntime.model.SystemContentBlock;
+import software.amazon.awssdk.services.bedrockruntime.model.ToolConfiguration;
/**
* A Producer which sends messages to the Amazon Bedrock Service <a
href="http://aws.amazon.com/bedrock/">AWS
@@ -73,6 +80,12 @@ public class BedrockProducer extends DefaultProducer {
case invokeEmbeddingsModelStreaming:
invokeEmbeddingsModelStreaming(getEndpoint().getBedrockRuntimeClient(),
exchange);
break;
+ case converse:
+ converse(getEndpoint().getBedrockRuntimeClient(), exchange);
+ break;
+ case converseStream:
+ converseStream(exchange);
+ break;
default:
throw new IllegalArgumentException("Unsupported operation");
}
@@ -506,6 +519,232 @@ public class BedrockProducer extends DefaultProducer {
message.setHeader(BedrockConstants.STREAMING_CHUNK_COUNT,
metadata.getChunkCount());
}
+ private void converse(BedrockRuntimeClient bedrockRuntimeClient, Exchange
exchange) throws InvalidPayloadException {
+ ConverseRequest request;
+
+ if (getConfiguration().isPojoRequest()) {
+ Object payload = exchange.getMessage().getMandatoryBody();
+ if (payload instanceof ConverseRequest) {
+ request = (ConverseRequest) payload;
+ } else {
+ throw new IllegalArgumentException(
+ "Converse operation requires ConverseRequest in POJO
mode");
+ }
+ } else {
+ // Build request from headers and body
+ ConverseRequest.Builder builder = ConverseRequest.builder();
+
+ // Set model ID
+ builder.modelId(getConfiguration().getModelId());
+
+ // Get messages from header or body
+ @SuppressWarnings("unchecked")
+ List<software.amazon.awssdk.services.bedrockruntime.model.Message>
messages
+ =
exchange.getMessage().getHeader(BedrockConstants.CONVERSE_MESSAGES, List.class);
+ if (messages != null) {
+ builder.messages(messages);
+ } else {
+ throw new IllegalArgumentException(
+ "Converse operation requires messages in header " +
BedrockConstants.CONVERSE_MESSAGES);
+ }
+
+ // Optional: System prompts
+ @SuppressWarnings("unchecked")
+ List<SystemContentBlock> system
+ =
exchange.getMessage().getHeader(BedrockConstants.CONVERSE_SYSTEM, List.class);
+ if (system != null) {
+ builder.system(system);
+ }
+
+ // Optional: Inference configuration
+ InferenceConfiguration inferenceConfig
+ =
exchange.getMessage().getHeader(BedrockConstants.CONVERSE_INFERENCE_CONFIG,
InferenceConfiguration.class);
+ if (inferenceConfig != null) {
+ builder.inferenceConfig(inferenceConfig);
+ }
+
+ // Optional: Tool configuration
+ ToolConfiguration toolConfig
+ =
exchange.getMessage().getHeader(BedrockConstants.CONVERSE_TOOL_CONFIG,
ToolConfiguration.class);
+ if (toolConfig != null) {
+ builder.toolConfig(toolConfig);
+ }
+
+ // Optional: Additional model request fields
+ software.amazon.awssdk.core.document.Document additionalFields =
exchange.getMessage()
+
.getHeader(BedrockConstants.CONVERSE_ADDITIONAL_MODEL_REQUEST_FIELDS,
+
software.amazon.awssdk.core.document.Document.class);
+ if (additionalFields != null) {
+ builder.additionalModelRequestFields(additionalFields);
+ }
+
+ request = builder.build();
+ }
+
+ try {
+ ConverseResponse response = bedrockRuntimeClient.converse(request);
+
+ org.apache.camel.Message message = getMessageForResponse(exchange);
+
+ // Set the output message content as body
+ if (response.output() != null && response.output().message() !=
null) {
+ software.amazon.awssdk.services.bedrockruntime.model.Message
outputMessage = response.output().message();
+ message.setHeader(BedrockConstants.CONVERSE_OUTPUT_MESSAGE,
outputMessage);
+
+ // Extract text content from the message
+ StringBuilder textContent = new StringBuilder();
+ for (ContentBlock content : outputMessage.content()) {
+ if (content.text() != null) {
+ textContent.append(content.text());
+ }
+ }
+ message.setBody(textContent.toString());
+ }
+
+ // Set metadata headers
+ if (response.stopReason() != null) {
+ message.setHeader(BedrockConstants.CONVERSE_STOP_REASON,
response.stopReason().toString());
+ }
+ if (response.usage() != null) {
+ message.setHeader(BedrockConstants.CONVERSE_USAGE,
response.usage());
+ }
+
+ } catch (AwsServiceException ase) {
+ LOG.trace("Converse command returned the error code {}",
ase.awsErrorDetails().errorCode());
+ throw ase;
+ }
+ }
+
+ private void converseStream(Exchange exchange) throws
InvalidPayloadException {
+ ConverseStreamRequest request;
+
+ if (getConfiguration().isPojoRequest()) {
+ Object payload = exchange.getMessage().getMandatoryBody();
+ if (payload instanceof ConverseStreamRequest) {
+ request = (ConverseStreamRequest) payload;
+ } else {
+ throw new IllegalArgumentException(
+ "ConverseStream operation requires
ConverseStreamRequest in POJO mode");
+ }
+ } else {
+ // Build request from headers and body
+ ConverseStreamRequest.Builder builder =
ConverseStreamRequest.builder();
+
+ // Set model ID
+ builder.modelId(getConfiguration().getModelId());
+
+ // Get messages from header or body
+ @SuppressWarnings("unchecked")
+ List<software.amazon.awssdk.services.bedrockruntime.model.Message>
messages
+ =
exchange.getMessage().getHeader(BedrockConstants.CONVERSE_MESSAGES, List.class);
+ if (messages != null) {
+ builder.messages(messages);
+ } else {
+ throw new IllegalArgumentException(
+ "ConverseStream operation requires messages in header
" + BedrockConstants.CONVERSE_MESSAGES);
+ }
+
+ // Optional: System prompts
+ @SuppressWarnings("unchecked")
+ List<SystemContentBlock> system
+ =
exchange.getMessage().getHeader(BedrockConstants.CONVERSE_SYSTEM, List.class);
+ if (system != null) {
+ builder.system(system);
+ }
+
+ // Optional: Inference configuration
+ InferenceConfiguration inferenceConfig
+ =
exchange.getMessage().getHeader(BedrockConstants.CONVERSE_INFERENCE_CONFIG,
InferenceConfiguration.class);
+ if (inferenceConfig != null) {
+ builder.inferenceConfig(inferenceConfig);
+ }
+
+ // Optional: Tool configuration
+ ToolConfiguration toolConfig
+ =
exchange.getMessage().getHeader(BedrockConstants.CONVERSE_TOOL_CONFIG,
ToolConfiguration.class);
+ if (toolConfig != null) {
+ builder.toolConfig(toolConfig);
+ }
+
+ // Optional: Additional model request fields
+ software.amazon.awssdk.core.document.Document additionalFields =
exchange.getMessage()
+
.getHeader(BedrockConstants.CONVERSE_ADDITIONAL_MODEL_REQUEST_FIELDS,
+
software.amazon.awssdk.core.document.Document.class);
+ if (additionalFields != null) {
+ builder.additionalModelRequestFields(additionalFields);
+ }
+
+ request = builder.build();
+ }
+
+ processConverseStreamingRequest(request, exchange);
+ }
+
+ private void processConverseStreamingRequest(ConverseStreamRequest
request, Exchange exchange) {
+ try {
+ String streamOutputMode = getConfiguration().getStreamOutputMode();
+ if (streamOutputMode == null) {
+ streamOutputMode = "complete";
+ }
+
+ // Check if mode is overridden in headers
+ if
(ObjectHelper.isNotEmpty(exchange.getMessage().getHeader(BedrockConstants.STREAM_OUTPUT_MODE)))
{
+ streamOutputMode =
exchange.getIn().getHeader(BedrockConstants.STREAM_OUTPUT_MODE, String.class);
+ }
+
+ org.apache.camel.Message message = getMessageForResponse(exchange);
+
org.apache.camel.component.aws2.bedrock.runtime.stream.ConverseStreamHandler.StreamMetadata
metadata
+ = new
org.apache.camel.component.aws2.bedrock.runtime.stream.ConverseStreamHandler.StreamMetadata();
+
+ if ("chunks".equals(streamOutputMode)) {
+ // Chunks mode - emit each chunk as separate message
+ List<String> allChunks = new ArrayList<>();
+ getEndpoint().getBedrockRuntimeAsyncClient().converseStream(
+ request,
+
org.apache.camel.component.aws2.bedrock.runtime.stream.ConverseStreamHandler.createChunksHandler(
+ metadata,
+ allChunks,
+ null))
+ .join();
+
+ message.setBody(allChunks);
+ if (getConfiguration().isIncludeStreamingMetadata()) {
+ setConverseStreamingMetadata(message, metadata);
+ }
+ } else {
+ // Complete mode - accumulate all chunks and return complete
response
+ StringBuilder fullText = new StringBuilder();
+ getEndpoint().getBedrockRuntimeAsyncClient().converseStream(
+ request,
+
org.apache.camel.component.aws2.bedrock.runtime.stream.ConverseStreamHandler.createCompleteHandler(
+ metadata,
+ fullText))
+ .join();
+
+ message.setBody(fullText.toString());
+ if (getConfiguration().isIncludeStreamingMetadata()) {
+ setConverseStreamingMetadata(message, metadata);
+ }
+ }
+
+ } catch (AwsServiceException ase) {
+ LOG.trace("Converse Stream command returned the error code {}",
ase.awsErrorDetails().errorCode());
+ throw ase;
+ }
+ }
+
+ private void setConverseStreamingMetadata(
+ org.apache.camel.Message message,
+
org.apache.camel.component.aws2.bedrock.runtime.stream.ConverseStreamHandler.StreamMetadata
metadata) {
+ if (metadata.getStopReason() != null) {
+ message.setHeader(BedrockConstants.CONVERSE_STOP_REASON,
metadata.getStopReason());
+ }
+ if (metadata.getUsage() != null) {
+ message.setHeader(BedrockConstants.CONVERSE_USAGE,
metadata.getUsage());
+ }
+ message.setHeader(BedrockConstants.STREAMING_CHUNK_COUNT,
metadata.getChunkCount());
+ }
+
public static Message getMessageForResponse(final Exchange exchange) {
return exchange.getMessage();
}
diff --git
a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/stream/ConverseStreamHandler.java
b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/stream/ConverseStreamHandler.java
new file mode 100644
index 000000000000..1b242d5481a5
--- /dev/null
+++
b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/stream/ConverseStreamHandler.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.bedrock.runtime.stream;
+
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import
software.amazon.awssdk.services.bedrockruntime.model.ConverseStreamResponseHandler;
+import software.amazon.awssdk.services.bedrockruntime.model.TokenUsage;
+
+/**
+ * Utility class for handling Converse API streaming responses from Bedrock
models
+ */
+public final class ConverseStreamHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ConverseStreamHandler.class);
+
+ private ConverseStreamHandler() {
+ // Utility class
+ }
+
+ /**
+ * Create a response handler for complete mode (accumulates all chunks)
+ *
+ * @param metadata the metadata object to populate
+ * @param fullText the string builder to accumulate text
+ * @return the response handler
+ */
+ public static ConverseStreamResponseHandler createCompleteHandler(
+ StreamMetadata metadata,
+ StringBuilder fullText) {
+
+ int[] chunkCount = { 0 };
+
+ return ConverseStreamResponseHandler.builder()
+ .subscriber(ConverseStreamResponseHandler.Visitor.builder()
+ .onContentBlockDelta(delta -> {
+ if (delta.delta() != null && delta.delta().text()
!= null) {
+ fullText.append(delta.delta().text());
+ }
+ chunkCount[0]++;
+ })
+ .onMetadata(metadataEvent -> {
+ if (metadataEvent.usage() != null) {
+ metadata.setUsage(metadataEvent.usage());
+ }
+ })
+ .onMessageStop(stop -> {
+ if (stop.stopReason() != null) {
+
metadata.setStopReason(stop.stopReason().toString());
+ }
+ })
+ .build())
+ .onComplete(() -> {
+ metadata.setChunkCount(chunkCount[0]);
+ metadata.setFullText(fullText.toString());
+ })
+ .build();
+ }
+
+ /**
+ * Create a response handler for chunks mode (emits each chunk)
+ *
+ * @param metadata the metadata object to populate
+ * @param chunks the list to collect chunks
+ * @param chunkConsumer consumer that receives each chunk
+ * @return the response handler
+ */
+ public static ConverseStreamResponseHandler createChunksHandler(
+ StreamMetadata metadata,
+ List<String> chunks,
+ Consumer<String> chunkConsumer) {
+
+ int[] chunkCount = { 0 };
+
+ return ConverseStreamResponseHandler.builder()
+ .subscriber(ConverseStreamResponseHandler.Visitor.builder()
+ .onContentBlockDelta(delta -> {
+ if (delta.delta() != null && delta.delta().text()
!= null) {
+ String text = delta.delta().text();
+ chunks.add(text);
+ if (chunkConsumer != null) {
+ chunkConsumer.accept(text);
+ }
+ }
+ chunkCount[0]++;
+ })
+ .onMetadata(metadataEvent -> {
+ if (metadataEvent.usage() != null) {
+ metadata.setUsage(metadataEvent.usage());
+ }
+ })
+ .onMessageStop(stop -> {
+ if (stop.stopReason() != null) {
+
metadata.setStopReason(stop.stopReason().toString());
+ }
+ })
+ .build())
+ .onComplete(() -> {
+ metadata.setChunkCount(chunkCount[0]);
+ metadata.setChunks(chunks);
+ })
+ .build();
+ }
+
+ /**
+ * Metadata extracted from Converse streaming response
+ */
+ public static class StreamMetadata {
+ private String fullText;
+ private List<String> chunks;
+ private String stopReason;
+ private TokenUsage usage;
+ private int chunkCount;
+
+ public String getFullText() {
+ return fullText;
+ }
+
+ public void setFullText(String fullText) {
+ this.fullText = fullText;
+ }
+
+ public List<String> getChunks() {
+ return chunks;
+ }
+
+ public void setChunks(List<String> chunks) {
+ this.chunks = chunks;
+ }
+
+ public String getStopReason() {
+ return stopReason;
+ }
+
+ public void setStopReason(String stopReason) {
+ this.stopReason = stopReason;
+ }
+
+ public TokenUsage getUsage() {
+ return usage;
+ }
+
+ public void setUsage(TokenUsage usage) {
+ this.usage = usage;
+ }
+
+ public int getChunkCount() {
+ return chunkCount;
+ }
+
+ public void setChunkCount(int chunkCount) {
+ this.chunkCount = chunkCount;
+ }
+ }
+}
diff --git
a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/integration/BedrockProducerIT.java
b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/integration/BedrockProducerIT.java
index ce51d3f7ef1d..6455c673a2b8 100644
---
a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/integration/BedrockProducerIT.java
+++
b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/integration/BedrockProducerIT.java
@@ -26,6 +26,7 @@ import org.apache.camel.component.aws2.bedrock.BedrockModels;
import org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.condition.EnabledIfSystemProperties;
@@ -45,6 +46,11 @@ class BedrockProducerIT extends CamelTestSupport {
@EndpointInject("mock:result")
private MockEndpoint result;
+ @BeforeEach
+ public void resetMocks() {
+ result.reset();
+ }
+
@Test
public void testInvokeTitanExpressModel() throws InterruptedException {
@@ -867,6 +873,59 @@ class BedrockProducerIT extends CamelTestSupport {
MockEndpoint.assertIsSatisfied(context);
}
+ @Test
+ public void testConverseWithClaudeModel() throws InterruptedException {
+ result.expectedMessageCount(1);
+ final Exchange result = template.send("direct:converse_claude",
exchange -> {
+ // Create a message using the Converse API
+
java.util.List<software.amazon.awssdk.services.bedrockruntime.model.Message>
messages = new java.util.ArrayList<>();
+
messages.add(software.amazon.awssdk.services.bedrockruntime.model.Message.builder()
+
.role(software.amazon.awssdk.services.bedrockruntime.model.ConversationRole.USER)
+
.content(software.amazon.awssdk.services.bedrockruntime.model.ContentBlock
+ .fromText("What is the capital of France?"))
+ .build());
+
+
exchange.getMessage().setHeader(BedrockConstants.CONVERSE_MESSAGES, messages);
+
+ // Optional: Add inference configuration
+
software.amazon.awssdk.services.bedrockruntime.model.InferenceConfiguration
inferenceConfig
+ =
software.amazon.awssdk.services.bedrockruntime.model.InferenceConfiguration.builder()
+ .maxTokens(100)
+ .temperature(0.7f)
+ .build();
+
exchange.getMessage().setHeader(BedrockConstants.CONVERSE_INFERENCE_CONFIG,
inferenceConfig);
+ });
+
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ @Test
+ public void testConverseStreamWithClaudeModel() throws
InterruptedException {
+ result.expectedMessageCount(1);
+ final Exchange result = template.send("direct:converse_stream_claude",
exchange -> {
+ // Create a message using the Converse API
+
java.util.List<software.amazon.awssdk.services.bedrockruntime.model.Message>
messages = new java.util.ArrayList<>();
+
messages.add(software.amazon.awssdk.services.bedrockruntime.model.Message.builder()
+
.role(software.amazon.awssdk.services.bedrockruntime.model.ConversationRole.USER)
+
.content(software.amazon.awssdk.services.bedrockruntime.model.ContentBlock
+ .fromText("Tell me a short joke about Java
programming"))
+ .build());
+
+
exchange.getMessage().setHeader(BedrockConstants.CONVERSE_MESSAGES, messages);
+
exchange.getMessage().setHeader(BedrockConstants.STREAM_OUTPUT_MODE,
"complete");
+
+ // Optional: Add inference configuration
+
software.amazon.awssdk.services.bedrockruntime.model.InferenceConfiguration
inferenceConfig
+ =
software.amazon.awssdk.services.bedrockruntime.model.InferenceConfiguration.builder()
+ .maxTokens(200)
+ .temperature(0.9f)
+ .build();
+
exchange.getMessage().setHeader(BedrockConstants.CONVERSE_INFERENCE_CONFIG,
inferenceConfig);
+ });
+
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@@ -1057,6 +1116,18 @@ class BedrockProducerIT extends CamelTestSupport {
+ BedrockModels.MISTRAL_SMALL_2402.model)
.log("Completions: ${body}")
.to(result);
+
+ from("direct:converse_claude")
+
.to("aws-bedrock:label?accessKey=RAW({{aws.manual.access.key}})&secretKey=RAW({{aws.manual.secret.key}})®ion=us-east-1&operation=converse&modelId="
+ + BedrockModels.ANTROPHIC_CLAUDE_V3.model)
+ .log("Converse response: ${body}")
+ .to(result);
+
+ from("direct:converse_stream_claude")
+
.to("aws-bedrock:label?accessKey=RAW({{aws.manual.access.key}})&secretKey=RAW({{aws.manual.secret.key}})®ion=us-east-1&operation=converseStream&modelId="
+ + BedrockModels.ANTROPHIC_CLAUDE_V3.model)
+ .log("Converse stream response: ${body}")
+ .to(result);
}
};
}
diff --git
a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/integration/BedrockProducerStreamingIT.java
b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/integration/BedrockProducerStreamingIT.java
index 6efe1bc9a7db..62e189dbe5b8 100644
---
a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/integration/BedrockProducerStreamingIT.java
+++
b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/integration/BedrockProducerStreamingIT.java
@@ -61,10 +61,8 @@ class BedrockProducerStreamingIT extends CamelTestSupport {
private MockEndpoint result;
@BeforeEach
- public void resetMocks() throws InterruptedException {
+ public void resetMocks() {
result.reset();
- // Add small delay to avoid AWS rate limiting between tests
- Thread.sleep(1000);
}
@Test
diff --git
a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/stream/ConverseStreamHandlerTest.java
b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/stream/ConverseStreamHandlerTest.java
new file mode 100644
index 000000000000..a07aa1a728ee
--- /dev/null
+++
b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/stream/ConverseStreamHandlerTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.bedrock.runtime.stream;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+import
software.amazon.awssdk.services.bedrockruntime.model.ConverseStreamResponseHandler;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class ConverseStreamHandlerTest {
+
+ @Test
+ void testCreateCompleteHandler() {
+ ConverseStreamHandler.StreamMetadata metadata = new
ConverseStreamHandler.StreamMetadata();
+ StringBuilder fullText = new StringBuilder();
+
+ ConverseStreamResponseHandler handler =
ConverseStreamHandler.createCompleteHandler(metadata, fullText);
+
+ assertNotNull(handler, "Handler should not be null");
+ assertNotNull(metadata, "Metadata should not be null");
+ assertNotNull(fullText, "Full text builder should not be null");
+ }
+
+ @Test
+ void testCreateChunksHandler() {
+ ConverseStreamHandler.StreamMetadata metadata = new
ConverseStreamHandler.StreamMetadata();
+ List<String> chunks = new ArrayList<>();
+
+ ConverseStreamResponseHandler handler
+ = ConverseStreamHandler.createChunksHandler(metadata, chunks,
null);
+
+ assertNotNull(handler, "Handler should not be null");
+ assertNotNull(metadata, "Metadata should not be null");
+ assertNotNull(chunks, "Chunks list should not be null");
+ }
+
+ @Test
+ void testStreamMetadata() {
+ ConverseStreamHandler.StreamMetadata metadata = new
ConverseStreamHandler.StreamMetadata();
+
+ // Test setting and getting fullText
+ metadata.setFullText("Test response");
+ assertEquals("Test response", metadata.getFullText());
+
+ // Test setting and getting chunks
+ List<String> chunks = List.of("chunk1", "chunk2");
+ metadata.setChunks(chunks);
+ assertEquals(chunks, metadata.getChunks());
+
+ // Test setting and getting stopReason
+ metadata.setStopReason("end_turn");
+ assertEquals("end_turn", metadata.getStopReason());
+
+ // Test setting and getting chunkCount
+ metadata.setChunkCount(5);
+ assertEquals(5, metadata.getChunkCount());
+
+ // Test usage is null initially
+ assertEquals(null, metadata.getUsage());
+ }
+}
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/BedrockEndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/BedrockEndpointBuilderFactory.java
index aca85d17480b..8cdb93811d2f 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/BedrockEndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/BedrockEndpointBuilderFactory.java
@@ -772,6 +772,106 @@ public interface BedrockEndpointBuilderFactory {
public String awsBedrockChunkCount() {
return "CamelAwsBedrockChunkCount";
}
+ /**
+ * The conversation messages for Converse API.
+ *
+ * The option is a: {@code List<Message>} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code AwsBedrockConverseMessages}.
+ */
+ public String awsBedrockConverseMessages() {
+ return "CamelAwsBedrockConverseMessages";
+ }
+ /**
+ * The system prompts for Converse API.
+ *
+ * The option is a: {@code List<SystemContentBlock>} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code AwsBedrockConverseSystem}.
+ */
+ public String awsBedrockConverseSystem() {
+ return "CamelAwsBedrockConverseSystem";
+ }
+ /**
+ * The inference configuration for Converse API.
+ *
+ * The option is a: {@code InferenceConfiguration} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code
+ * AwsBedrockConverseInferenceConfig}.
+ */
+ public String awsBedrockConverseInferenceConfig() {
+ return "CamelAwsBedrockConverseInferenceConfig";
+ }
+ /**
+ * The tool configuration for Converse API.
+ *
+ * The option is a: {@code ToolConfiguration} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code AwsBedrockConverseToolConfig}.
+ */
+ public String awsBedrockConverseToolConfig() {
+ return "CamelAwsBedrockConverseToolConfig";
+ }
+ /**
+ * The additional model request fields for Converse API.
+ *
+ * The option is a: {@code
+ * software.amazon.awssdk.core.document.Document} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code
+ * AwsBedrockConverseAdditionalFields}.
+ */
+ public String awsBedrockConverseAdditionalFields() {
+ return "CamelAwsBedrockConverseAdditionalFields";
+ }
+ /**
+ * The stop reason from Converse API response.
+ *
+ * The option is a: {@code String} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code AwsBedrockConverseStopReason}.
+ */
+ public String awsBedrockConverseStopReason() {
+ return "CamelAwsBedrockConverseStopReason";
+ }
+ /**
+ * The usage metrics from Converse API response.
+ *
+ * The option is a: {@code TokenUsage} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code AwsBedrockConverseUsage}.
+ */
+ public String awsBedrockConverseUsage() {
+ return "CamelAwsBedrockConverseUsage";
+ }
+ /**
+ * The output message from Converse API response.
+ *
+ * The option is a: {@code Message} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code
+ * AwsBedrockConverseOutputMessage}.
+ */
+ public String awsBedrockConverseOutputMessage() {
+ return "CamelAwsBedrockConverseOutputMessage";
+ }
}
static BedrockEndpointBuilder endpointBuilder(String componentName, String
path) {
class BedrockEndpointBuilderImpl extends AbstractEndpointBuilder
implements BedrockEndpointBuilder, AdvancedBedrockEndpointBuilder {