This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch CAMEL-22622 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 40c06750fde3ef558ad3634ced5ae290a84429b8 Author: Andrea Cosentino <[email protected]> AuthorDate: Wed Oct 29 12:46:35 2025 +0100 CAMEL-22622 - Camel-AWS-Bedrock: Support Converse API Signed-off-by: Andrea Cosentino <[email protected]> --- .../camel/catalog/components/aws-bedrock.json | 14 +- .../aws2/bedrock/runtime/aws-bedrock.json | 14 +- .../aws2/bedrock/runtime/BedrockConstants.java | 17 ++ .../aws2/bedrock/runtime/BedrockOperations.java | 6 +- .../aws2/bedrock/runtime/BedrockProducer.java | 239 +++++++++++++++++++++ .../runtime/stream/ConverseStreamHandler.java | 172 +++++++++++++++ .../runtime/integration/BedrockProducerIT.java | 71 ++++++ .../integration/BedrockProducerStreamingIT.java | 4 +- .../runtime/stream/ConverseStreamHandlerTest.java | 79 +++++++ .../dsl/BedrockEndpointBuilderFactory.java | 100 +++++++++ 10 files changed, 706 insertions(+), 10 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws-bedrock.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws-bedrock.json index 4b75e1b80f4f..c392de4f2189 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws-bedrock.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws-bedrock.json @@ -28,7 +28,7 @@ "includeStreamingMetadata": { "index": 1, "kind": "property", "displayName": "Include Streaming Metadata", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration", "configurationField": "configuration", "description": "Whether to include streaming metadata in the response hea [...] "lazyStartProducer": { "index": 2, "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail [...] "modelId": { "index": 3, "kind": "property", "displayName": "Model Id", "group": "producer", "label": "", "required": true, "type": "enum", "javaType": "java.lang.String", "enum": [ "amazon.titan-text-express-v1", "amazon.titan-text-lite-v1", "amazon.titan-image-generator-v1", "amazon.titan-embed-text-v1", "amazon.titan-embed-image-v1", "amazon.titan-text-premier-v1:0", "amazon.titan-embed-text-v2:0", "amazon.titan-image-generator-v2:0", "amazon.nova-canvas-v1:0", "amazon.nova-lite-v [...] - "operation": { "index": 4, "kind": "property", "displayName": "Operation", "group": "producer", "label": "", "required": true, "type": "enum", "javaType": "org.apache.camel.component.aws2.bedrock.runtime.BedrockOperations", "enum": [ "invokeTextModel", "invokeImageModel", "invokeEmbeddingsModel", "invokeTextModelStreaming", "invokeImageModelStreaming", "invokeEmbeddingsModelStreaming" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationCl [...] + "operation": { "index": 4, "kind": "property", "displayName": "Operation", "group": "producer", "label": "", "required": true, "type": "enum", "javaType": "org.apache.camel.component.aws2.bedrock.runtime.BedrockOperations", "enum": [ "invokeTextModel", "invokeImageModel", "invokeEmbeddingsModel", "invokeTextModelStreaming", "invokeImageModelStreaming", "invokeEmbeddingsModelStreaming", "converse", "converseStream" ], "deprecated": false, "deprecationNote": "", "autowired": false, "se [...] "overrideEndpoint": { "index": 5, "kind": "property", "displayName": "Override Endpoint", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration", "configurationField": "configuration", "description": "Set the need for overriding the endpoint. This option needs to be used in [...] "pojoRequest": { "index": 6, "kind": "property", "displayName": "Pojo Request", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration", "configurationField": "configuration", "description": "If we want to use a POJO request as body or not" }, "profileCredentialsName": { "index": 7, "kind": "property", "displayName": "Profile Credentials Name", "group": "producer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "false", "configurationClass": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration", "configurationField": "configuration", "description": "If using a profile credentials provider, this para [...] @@ -58,13 +58,21 @@ "CamelAwsBedrockStreamOutputMode": { "index": 3, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The streaming output mode (complete or chunks)", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAM_OUTPUT_MODE" }, "CamelAwsBedrockCompletionReason": { "index": 4, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The completion reason for streaming response", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAMING_COMPLETION_REASON" }, "CamelAwsBedrockTokenCount": { "index": 5, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The number of tokens generated in streaming response", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAMING_TOKEN_COUNT" }, - "CamelAwsBedrockChunkCount": { "index": 6, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The number of chunks received in streaming response", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAMING_CHUNK_COUNT" } + "CamelAwsBedrockChunkCount": { "index": 6, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The number of chunks received in streaming response", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAMING_CHUNK_COUNT" }, + "CamelAwsBedrockConverseMessages": { "index": 7, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "List<Message>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The conversation messages for Converse API", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_MESSAGES" }, + "CamelAwsBedrockConverseSystem": { "index": 8, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "List<SystemContentBlock>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The system prompts for Converse API", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_SYSTEM" }, + "CamelAwsBedrockConverseInferenceConfig": { "index": 9, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "InferenceConfiguration", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The inference configuration for Converse API", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_INFERENCE_CONFIG" }, + "CamelAwsBedrockConverseToolConfig": { "index": 10, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "ToolConfiguration", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The tool configuration for Converse API", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_TOOL_CONFIG" }, + "CamelAwsBedrockConverseAdditionalFields": { "index": 11, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "software.amazon.awssdk.core.document.Document", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The additional model request fields for Converse API", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_ADDITIONAL_MODEL_REQUEST_FIELDS" }, + "CamelAwsBedrockConverseStopReason": { "index": 12, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The stop reason from Converse API response", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_STOP_REASON" }, + "CamelAwsBedrockConverseUsage": { "index": 13, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "TokenUsage", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The usage metrics from Converse API response", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_USAGE" }, + "CamelAwsBedrockConverseOutputMessage": { "index": 14, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Message", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The output message from Converse API response", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_OUTPUT_MESSAGE" } }, "properties": { "label": { "index": 0, "kind": "path", "displayName": "Label", "group": "producer", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration", "configurationField": "configuration", "description": "Logical name" }, "includeStreamingMetadata": { "index": 1, "kind": "parameter", "displayName": "Include Streaming Metadata", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration", "configurationField": "configuration", "description": "Whether to include streaming metadata in the response he [...] "modelId": { "index": 2, "kind": "parameter", "displayName": "Model Id", "group": "producer", "label": "", "required": true, "type": "enum", "javaType": "java.lang.String", "enum": [ "amazon.titan-text-express-v1", "amazon.titan-text-lite-v1", "amazon.titan-image-generator-v1", "amazon.titan-embed-text-v1", "amazon.titan-embed-image-v1", "amazon.titan-text-premier-v1:0", "amazon.titan-embed-text-v2:0", "amazon.titan-image-generator-v2:0", "amazon.nova-canvas-v1:0", "amazon.nova-lite- [...] - "operation": { "index": 3, "kind": "parameter", "displayName": "Operation", "group": "producer", "label": "", "required": true, "type": "enum", "javaType": "org.apache.camel.component.aws2.bedrock.runtime.BedrockOperations", "enum": [ "invokeTextModel", "invokeImageModel", "invokeEmbeddingsModel", "invokeTextModelStreaming", "invokeImageModelStreaming", "invokeEmbeddingsModelStreaming" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationC [...] + "operation": { "index": 3, "kind": "parameter", "displayName": "Operation", "group": "producer", "label": "", "required": true, "type": "enum", "javaType": "org.apache.camel.component.aws2.bedrock.runtime.BedrockOperations", "enum": [ "invokeTextModel", "invokeImageModel", "invokeEmbeddingsModel", "invokeTextModelStreaming", "invokeImageModelStreaming", "invokeEmbeddingsModelStreaming", "converse", "converseStream" ], "deprecated": false, "deprecationNote": "", "autowired": false, "s [...] "overrideEndpoint": { "index": 4, "kind": "parameter", "displayName": "Override Endpoint", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration", "configurationField": "configuration", "description": "Set the need for overriding the endpoint. This option needs to be used i [...] "pojoRequest": { "index": 5, "kind": "parameter", "displayName": "Pojo Request", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration", "configurationField": "configuration", "description": "If we want to use a POJO request as body or not" }, "profileCredentialsName": { "index": 6, "kind": "parameter", "displayName": "Profile Credentials Name", "group": "producer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "false", "configurationClass": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration", "configurationField": "configuration", "description": "If using a profile credentials provider, this par [...] diff --git a/components/camel-aws/camel-aws-bedrock/src/generated/resources/META-INF/org/apache/camel/component/aws2/bedrock/runtime/aws-bedrock.json b/components/camel-aws/camel-aws-bedrock/src/generated/resources/META-INF/org/apache/camel/component/aws2/bedrock/runtime/aws-bedrock.json index 4b75e1b80f4f..c392de4f2189 100644 --- a/components/camel-aws/camel-aws-bedrock/src/generated/resources/META-INF/org/apache/camel/component/aws2/bedrock/runtime/aws-bedrock.json +++ b/components/camel-aws/camel-aws-bedrock/src/generated/resources/META-INF/org/apache/camel/component/aws2/bedrock/runtime/aws-bedrock.json @@ -28,7 +28,7 @@ "includeStreamingMetadata": { "index": 1, "kind": "property", "displayName": "Include Streaming Metadata", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration", "configurationField": "configuration", "description": "Whether to include streaming metadata in the response hea [...] "lazyStartProducer": { "index": 2, "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail [...] "modelId": { "index": 3, "kind": "property", "displayName": "Model Id", "group": "producer", "label": "", "required": true, "type": "enum", "javaType": "java.lang.String", "enum": [ "amazon.titan-text-express-v1", "amazon.titan-text-lite-v1", "amazon.titan-image-generator-v1", "amazon.titan-embed-text-v1", "amazon.titan-embed-image-v1", "amazon.titan-text-premier-v1:0", "amazon.titan-embed-text-v2:0", "amazon.titan-image-generator-v2:0", "amazon.nova-canvas-v1:0", "amazon.nova-lite-v [...] - "operation": { "index": 4, "kind": "property", "displayName": "Operation", "group": "producer", "label": "", "required": true, "type": "enum", "javaType": "org.apache.camel.component.aws2.bedrock.runtime.BedrockOperations", "enum": [ "invokeTextModel", "invokeImageModel", "invokeEmbeddingsModel", "invokeTextModelStreaming", "invokeImageModelStreaming", "invokeEmbeddingsModelStreaming" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationCl [...] + "operation": { "index": 4, "kind": "property", "displayName": "Operation", "group": "producer", "label": "", "required": true, "type": "enum", "javaType": "org.apache.camel.component.aws2.bedrock.runtime.BedrockOperations", "enum": [ "invokeTextModel", "invokeImageModel", "invokeEmbeddingsModel", "invokeTextModelStreaming", "invokeImageModelStreaming", "invokeEmbeddingsModelStreaming", "converse", "converseStream" ], "deprecated": false, "deprecationNote": "", "autowired": false, "se [...] "overrideEndpoint": { "index": 5, "kind": "property", "displayName": "Override Endpoint", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration", "configurationField": "configuration", "description": "Set the need for overriding the endpoint. This option needs to be used in [...] "pojoRequest": { "index": 6, "kind": "property", "displayName": "Pojo Request", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration", "configurationField": "configuration", "description": "If we want to use a POJO request as body or not" }, "profileCredentialsName": { "index": 7, "kind": "property", "displayName": "Profile Credentials Name", "group": "producer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "false", "configurationClass": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration", "configurationField": "configuration", "description": "If using a profile credentials provider, this para [...] @@ -58,13 +58,21 @@ "CamelAwsBedrockStreamOutputMode": { "index": 3, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The streaming output mode (complete or chunks)", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAM_OUTPUT_MODE" }, "CamelAwsBedrockCompletionReason": { "index": 4, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The completion reason for streaming response", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAMING_COMPLETION_REASON" }, "CamelAwsBedrockTokenCount": { "index": 5, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The number of tokens generated in streaming response", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAMING_TOKEN_COUNT" }, - "CamelAwsBedrockChunkCount": { "index": 6, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The number of chunks received in streaming response", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAMING_CHUNK_COUNT" } + "CamelAwsBedrockChunkCount": { "index": 6, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The number of chunks received in streaming response", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#STREAMING_CHUNK_COUNT" }, + "CamelAwsBedrockConverseMessages": { "index": 7, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "List<Message>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The conversation messages for Converse API", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_MESSAGES" }, + "CamelAwsBedrockConverseSystem": { "index": 8, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "List<SystemContentBlock>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The system prompts for Converse API", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_SYSTEM" }, + "CamelAwsBedrockConverseInferenceConfig": { "index": 9, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "InferenceConfiguration", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The inference configuration for Converse API", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_INFERENCE_CONFIG" }, + "CamelAwsBedrockConverseToolConfig": { "index": 10, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "ToolConfiguration", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The tool configuration for Converse API", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_TOOL_CONFIG" }, + "CamelAwsBedrockConverseAdditionalFields": { "index": 11, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "software.amazon.awssdk.core.document.Document", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The additional model request fields for Converse API", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_ADDITIONAL_MODEL_REQUEST_FIELDS" }, + "CamelAwsBedrockConverseStopReason": { "index": 12, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The stop reason from Converse API response", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_STOP_REASON" }, + "CamelAwsBedrockConverseUsage": { "index": 13, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "TokenUsage", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The usage metrics from Converse API response", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_USAGE" }, + "CamelAwsBedrockConverseOutputMessage": { "index": 14, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Message", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The output message from Converse API response", "constantName": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants#CONVERSE_OUTPUT_MESSAGE" } }, "properties": { "label": { "index": 0, "kind": "path", "displayName": "Label", "group": "producer", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration", "configurationField": "configuration", "description": "Logical name" }, "includeStreamingMetadata": { "index": 1, "kind": "parameter", "displayName": "Include Streaming Metadata", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration", "configurationField": "configuration", "description": "Whether to include streaming metadata in the response he [...] "modelId": { "index": 2, "kind": "parameter", "displayName": "Model Id", "group": "producer", "label": "", "required": true, "type": "enum", "javaType": "java.lang.String", "enum": [ "amazon.titan-text-express-v1", "amazon.titan-text-lite-v1", "amazon.titan-image-generator-v1", "amazon.titan-embed-text-v1", "amazon.titan-embed-image-v1", "amazon.titan-text-premier-v1:0", "amazon.titan-embed-text-v2:0", "amazon.titan-image-generator-v2:0", "amazon.nova-canvas-v1:0", "amazon.nova-lite- [...] - "operation": { "index": 3, "kind": "parameter", "displayName": "Operation", "group": "producer", "label": "", "required": true, "type": "enum", "javaType": "org.apache.camel.component.aws2.bedrock.runtime.BedrockOperations", "enum": [ "invokeTextModel", "invokeImageModel", "invokeEmbeddingsModel", "invokeTextModelStreaming", "invokeImageModelStreaming", "invokeEmbeddingsModelStreaming" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationC [...] + "operation": { "index": 3, "kind": "parameter", "displayName": "Operation", "group": "producer", "label": "", "required": true, "type": "enum", "javaType": "org.apache.camel.component.aws2.bedrock.runtime.BedrockOperations", "enum": [ "invokeTextModel", "invokeImageModel", "invokeEmbeddingsModel", "invokeTextModelStreaming", "invokeImageModelStreaming", "invokeEmbeddingsModelStreaming", "converse", "converseStream" ], "deprecated": false, "deprecationNote": "", "autowired": false, "s [...] "overrideEndpoint": { "index": 4, "kind": "parameter", "displayName": "Override Endpoint", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration", "configurationField": "configuration", "description": "Set the need for overriding the endpoint. This option needs to be used i [...] "pojoRequest": { "index": 5, "kind": "parameter", "displayName": "Pojo Request", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration", "configurationField": "configuration", "description": "If we want to use a POJO request as body or not" }, "profileCredentialsName": { "index": 6, "kind": "parameter", "displayName": "Profile Credentials Name", "group": "producer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "false", "configurationClass": "org.apache.camel.component.aws2.bedrock.runtime.BedrockConfiguration", "configurationField": "configuration", "description": "If using a profile credentials provider, this par [...] diff --git a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockConstants.java b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockConstants.java index f0df52b81a4b..09929fe76515 100644 --- a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockConstants.java +++ b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockConstants.java @@ -36,4 +36,21 @@ public interface BedrockConstants { String STREAMING_TOKEN_COUNT = "CamelAwsBedrockTokenCount"; @Metadata(description = "The number of chunks received in streaming response", javaType = "Integer") String STREAMING_CHUNK_COUNT = "CamelAwsBedrockChunkCount"; + @Metadata(description = "The conversation messages for Converse API", javaType = "List<Message>") + String CONVERSE_MESSAGES = "CamelAwsBedrockConverseMessages"; + @Metadata(description = "The system prompts for Converse API", javaType = "List<SystemContentBlock>") + String CONVERSE_SYSTEM = "CamelAwsBedrockConverseSystem"; + @Metadata(description = "The inference configuration for Converse API", javaType = "InferenceConfiguration") + String CONVERSE_INFERENCE_CONFIG = "CamelAwsBedrockConverseInferenceConfig"; + @Metadata(description = "The tool configuration for Converse API", javaType = "ToolConfiguration") + String CONVERSE_TOOL_CONFIG = "CamelAwsBedrockConverseToolConfig"; + @Metadata(description = "The additional model request fields for Converse API", + javaType = "software.amazon.awssdk.core.document.Document") + String CONVERSE_ADDITIONAL_MODEL_REQUEST_FIELDS = "CamelAwsBedrockConverseAdditionalFields"; + @Metadata(description = "The stop reason from Converse API response", javaType = "String") + String CONVERSE_STOP_REASON = "CamelAwsBedrockConverseStopReason"; + @Metadata(description = "The usage metrics from Converse API response", javaType = "TokenUsage") + String CONVERSE_USAGE = "CamelAwsBedrockConverseUsage"; + @Metadata(description = "The output message from Converse API response", javaType = "Message") + String CONVERSE_OUTPUT_MESSAGE = "CamelAwsBedrockConverseOutputMessage"; } diff --git a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockOperations.java b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockOperations.java index b541f6e4c206..079c82fd0497 100644 --- a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockOperations.java +++ b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockOperations.java @@ -28,5 +28,9 @@ public enum BedrockOperations { invokeImageModelStreaming, - invokeEmbeddingsModelStreaming + invokeEmbeddingsModelStreaming, + + converse, + + converseStream } diff --git a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockProducer.java b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockProducer.java index 7ee309ea8983..001ed6bc0c4e 100644 --- a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockProducer.java +++ b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/BedrockProducer.java @@ -35,9 +35,16 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.bedrockruntime.BedrockRuntimeClient; +import software.amazon.awssdk.services.bedrockruntime.model.ContentBlock; +import software.amazon.awssdk.services.bedrockruntime.model.ConverseRequest; +import software.amazon.awssdk.services.bedrockruntime.model.ConverseResponse; +import software.amazon.awssdk.services.bedrockruntime.model.ConverseStreamRequest; +import software.amazon.awssdk.services.bedrockruntime.model.InferenceConfiguration; import software.amazon.awssdk.services.bedrockruntime.model.InvokeModelRequest; import software.amazon.awssdk.services.bedrockruntime.model.InvokeModelResponse; import software.amazon.awssdk.services.bedrockruntime.model.InvokeModelWithResponseStreamRequest; +import software.amazon.awssdk.services.bedrockruntime.model.SystemContentBlock; +import software.amazon.awssdk.services.bedrockruntime.model.ToolConfiguration; /** * A Producer which sends messages to the Amazon Bedrock Service <a href="http://aws.amazon.com/bedrock/">AWS @@ -73,6 +80,12 @@ public class BedrockProducer extends DefaultProducer { case invokeEmbeddingsModelStreaming: invokeEmbeddingsModelStreaming(getEndpoint().getBedrockRuntimeClient(), exchange); break; + case converse: + converse(getEndpoint().getBedrockRuntimeClient(), exchange); + break; + case converseStream: + converseStream(exchange); + break; default: throw new IllegalArgumentException("Unsupported operation"); } @@ -506,6 +519,232 @@ public class BedrockProducer extends DefaultProducer { message.setHeader(BedrockConstants.STREAMING_CHUNK_COUNT, metadata.getChunkCount()); } + private void converse(BedrockRuntimeClient bedrockRuntimeClient, Exchange exchange) throws InvalidPayloadException { + ConverseRequest request; + + if (getConfiguration().isPojoRequest()) { + Object payload = exchange.getMessage().getMandatoryBody(); + if (payload instanceof ConverseRequest) { + request = (ConverseRequest) payload; + } else { + throw new IllegalArgumentException( + "Converse operation requires ConverseRequest in POJO mode"); + } + } else { + // Build request from headers and body + ConverseRequest.Builder builder = ConverseRequest.builder(); + + // Set model ID + builder.modelId(getConfiguration().getModelId()); + + // Get messages from header or body + @SuppressWarnings("unchecked") + List<software.amazon.awssdk.services.bedrockruntime.model.Message> messages + = exchange.getMessage().getHeader(BedrockConstants.CONVERSE_MESSAGES, List.class); + if (messages != null) { + builder.messages(messages); + } else { + throw new IllegalArgumentException( + "Converse operation requires messages in header " + BedrockConstants.CONVERSE_MESSAGES); + } + + // Optional: System prompts + @SuppressWarnings("unchecked") + List<SystemContentBlock> system + = exchange.getMessage().getHeader(BedrockConstants.CONVERSE_SYSTEM, List.class); + if (system != null) { + builder.system(system); + } + + // Optional: Inference configuration + InferenceConfiguration inferenceConfig + = exchange.getMessage().getHeader(BedrockConstants.CONVERSE_INFERENCE_CONFIG, InferenceConfiguration.class); + if (inferenceConfig != null) { + builder.inferenceConfig(inferenceConfig); + } + + // Optional: Tool configuration + ToolConfiguration toolConfig + = exchange.getMessage().getHeader(BedrockConstants.CONVERSE_TOOL_CONFIG, ToolConfiguration.class); + if (toolConfig != null) { + builder.toolConfig(toolConfig); + } + + // Optional: Additional model request fields + software.amazon.awssdk.core.document.Document additionalFields = exchange.getMessage() + .getHeader(BedrockConstants.CONVERSE_ADDITIONAL_MODEL_REQUEST_FIELDS, + software.amazon.awssdk.core.document.Document.class); + if (additionalFields != null) { + builder.additionalModelRequestFields(additionalFields); + } + + request = builder.build(); + } + + try { + ConverseResponse response = bedrockRuntimeClient.converse(request); + + org.apache.camel.Message message = getMessageForResponse(exchange); + + // Set the output message content as body + if (response.output() != null && response.output().message() != null) { + software.amazon.awssdk.services.bedrockruntime.model.Message outputMessage = response.output().message(); + message.setHeader(BedrockConstants.CONVERSE_OUTPUT_MESSAGE, outputMessage); + + // Extract text content from the message + StringBuilder textContent = new StringBuilder(); + for (ContentBlock content : outputMessage.content()) { + if (content.text() != null) { + textContent.append(content.text()); + } + } + message.setBody(textContent.toString()); + } + + // Set metadata headers + if (response.stopReason() != null) { + message.setHeader(BedrockConstants.CONVERSE_STOP_REASON, response.stopReason().toString()); + } + if (response.usage() != null) { + message.setHeader(BedrockConstants.CONVERSE_USAGE, response.usage()); + } + + } catch (AwsServiceException ase) { + LOG.trace("Converse command returned the error code {}", ase.awsErrorDetails().errorCode()); + throw ase; + } + } + + private void converseStream(Exchange exchange) throws InvalidPayloadException { + ConverseStreamRequest request; + + if (getConfiguration().isPojoRequest()) { + Object payload = exchange.getMessage().getMandatoryBody(); + if (payload instanceof ConverseStreamRequest) { + request = (ConverseStreamRequest) payload; + } else { + throw new IllegalArgumentException( + "ConverseStream operation requires ConverseStreamRequest in POJO mode"); + } + } else { + // Build request from headers and body + ConverseStreamRequest.Builder builder = ConverseStreamRequest.builder(); + + // Set model ID + builder.modelId(getConfiguration().getModelId()); + + // Get messages from header or body + @SuppressWarnings("unchecked") + List<software.amazon.awssdk.services.bedrockruntime.model.Message> messages + = exchange.getMessage().getHeader(BedrockConstants.CONVERSE_MESSAGES, List.class); + if (messages != null) { + builder.messages(messages); + } else { + throw new IllegalArgumentException( + "ConverseStream operation requires messages in header " + BedrockConstants.CONVERSE_MESSAGES); + } + + // Optional: System prompts + @SuppressWarnings("unchecked") + List<SystemContentBlock> system + = exchange.getMessage().getHeader(BedrockConstants.CONVERSE_SYSTEM, List.class); + if (system != null) { + builder.system(system); + } + + // Optional: Inference configuration + InferenceConfiguration inferenceConfig + = exchange.getMessage().getHeader(BedrockConstants.CONVERSE_INFERENCE_CONFIG, InferenceConfiguration.class); + if (inferenceConfig != null) { + builder.inferenceConfig(inferenceConfig); + } + + // Optional: Tool configuration + ToolConfiguration toolConfig + = exchange.getMessage().getHeader(BedrockConstants.CONVERSE_TOOL_CONFIG, ToolConfiguration.class); + if (toolConfig != null) { + builder.toolConfig(toolConfig); + } + + // Optional: Additional model request fields + software.amazon.awssdk.core.document.Document additionalFields = exchange.getMessage() + .getHeader(BedrockConstants.CONVERSE_ADDITIONAL_MODEL_REQUEST_FIELDS, + software.amazon.awssdk.core.document.Document.class); + if (additionalFields != null) { + builder.additionalModelRequestFields(additionalFields); + } + + request = builder.build(); + } + + processConverseStreamingRequest(request, exchange); + } + + private void processConverseStreamingRequest(ConverseStreamRequest request, Exchange exchange) { + try { + String streamOutputMode = getConfiguration().getStreamOutputMode(); + if (streamOutputMode == null) { + streamOutputMode = "complete"; + } + + // Check if mode is overridden in headers + if (ObjectHelper.isNotEmpty(exchange.getMessage().getHeader(BedrockConstants.STREAM_OUTPUT_MODE))) { + streamOutputMode = exchange.getIn().getHeader(BedrockConstants.STREAM_OUTPUT_MODE, String.class); + } + + org.apache.camel.Message message = getMessageForResponse(exchange); + org.apache.camel.component.aws2.bedrock.runtime.stream.ConverseStreamHandler.StreamMetadata metadata + = new org.apache.camel.component.aws2.bedrock.runtime.stream.ConverseStreamHandler.StreamMetadata(); + + if ("chunks".equals(streamOutputMode)) { + // Chunks mode - emit each chunk as separate message + List<String> allChunks = new ArrayList<>(); + getEndpoint().getBedrockRuntimeAsyncClient().converseStream( + request, + org.apache.camel.component.aws2.bedrock.runtime.stream.ConverseStreamHandler.createChunksHandler( + metadata, + allChunks, + null)) + .join(); + + message.setBody(allChunks); + if (getConfiguration().isIncludeStreamingMetadata()) { + setConverseStreamingMetadata(message, metadata); + } + } else { + // Complete mode - accumulate all chunks and return complete response + StringBuilder fullText = new StringBuilder(); + getEndpoint().getBedrockRuntimeAsyncClient().converseStream( + request, + org.apache.camel.component.aws2.bedrock.runtime.stream.ConverseStreamHandler.createCompleteHandler( + metadata, + fullText)) + .join(); + + message.setBody(fullText.toString()); + if (getConfiguration().isIncludeStreamingMetadata()) { + setConverseStreamingMetadata(message, metadata); + } + } + + } catch (AwsServiceException ase) { + LOG.trace("Converse Stream command returned the error code {}", ase.awsErrorDetails().errorCode()); + throw ase; + } + } + + private void setConverseStreamingMetadata( + org.apache.camel.Message message, + org.apache.camel.component.aws2.bedrock.runtime.stream.ConverseStreamHandler.StreamMetadata metadata) { + if (metadata.getStopReason() != null) { + message.setHeader(BedrockConstants.CONVERSE_STOP_REASON, metadata.getStopReason()); + } + if (metadata.getUsage() != null) { + message.setHeader(BedrockConstants.CONVERSE_USAGE, metadata.getUsage()); + } + message.setHeader(BedrockConstants.STREAMING_CHUNK_COUNT, metadata.getChunkCount()); + } + public static Message getMessageForResponse(final Exchange exchange) { return exchange.getMessage(); } diff --git a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/stream/ConverseStreamHandler.java b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/stream/ConverseStreamHandler.java new file mode 100644 index 000000000000..1b242d5481a5 --- /dev/null +++ b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/runtime/stream/ConverseStreamHandler.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws2.bedrock.runtime.stream; + +import java.util.List; +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.bedrockruntime.model.ConverseStreamResponseHandler; +import software.amazon.awssdk.services.bedrockruntime.model.TokenUsage; + +/** + * Utility class for handling Converse API streaming responses from Bedrock models + */ +public final class ConverseStreamHandler { + + private static final Logger LOG = LoggerFactory.getLogger(ConverseStreamHandler.class); + + private ConverseStreamHandler() { + // Utility class + } + + /** + * Create a response handler for complete mode (accumulates all chunks) + * + * @param metadata the metadata object to populate + * @param fullText the string builder to accumulate text + * @return the response handler + */ + public static ConverseStreamResponseHandler createCompleteHandler( + StreamMetadata metadata, + StringBuilder fullText) { + + int[] chunkCount = { 0 }; + + return ConverseStreamResponseHandler.builder() + .subscriber(ConverseStreamResponseHandler.Visitor.builder() + .onContentBlockDelta(delta -> { + if (delta.delta() != null && delta.delta().text() != null) { + fullText.append(delta.delta().text()); + } + chunkCount[0]++; + }) + .onMetadata(metadataEvent -> { + if (metadataEvent.usage() != null) { + metadata.setUsage(metadataEvent.usage()); + } + }) + .onMessageStop(stop -> { + if (stop.stopReason() != null) { + metadata.setStopReason(stop.stopReason().toString()); + } + }) + .build()) + .onComplete(() -> { + metadata.setChunkCount(chunkCount[0]); + metadata.setFullText(fullText.toString()); + }) + .build(); + } + + /** + * Create a response handler for chunks mode (emits each chunk) + * + * @param metadata the metadata object to populate + * @param chunks the list to collect chunks + * @param chunkConsumer consumer that receives each chunk + * @return the response handler + */ + public static ConverseStreamResponseHandler createChunksHandler( + StreamMetadata metadata, + List<String> chunks, + Consumer<String> chunkConsumer) { + + int[] chunkCount = { 0 }; + + return ConverseStreamResponseHandler.builder() + .subscriber(ConverseStreamResponseHandler.Visitor.builder() + .onContentBlockDelta(delta -> { + if (delta.delta() != null && delta.delta().text() != null) { + String text = delta.delta().text(); + chunks.add(text); + if (chunkConsumer != null) { + chunkConsumer.accept(text); + } + } + chunkCount[0]++; + }) + .onMetadata(metadataEvent -> { + if (metadataEvent.usage() != null) { + metadata.setUsage(metadataEvent.usage()); + } + }) + .onMessageStop(stop -> { + if (stop.stopReason() != null) { + metadata.setStopReason(stop.stopReason().toString()); + } + }) + .build()) + .onComplete(() -> { + metadata.setChunkCount(chunkCount[0]); + metadata.setChunks(chunks); + }) + .build(); + } + + /** + * Metadata extracted from Converse streaming response + */ + public static class StreamMetadata { + private String fullText; + private List<String> chunks; + private String stopReason; + private TokenUsage usage; + private int chunkCount; + + public String getFullText() { + return fullText; + } + + public void setFullText(String fullText) { + this.fullText = fullText; + } + + public List<String> getChunks() { + return chunks; + } + + public void setChunks(List<String> chunks) { + this.chunks = chunks; + } + + public String getStopReason() { + return stopReason; + } + + public void setStopReason(String stopReason) { + this.stopReason = stopReason; + } + + public TokenUsage getUsage() { + return usage; + } + + public void setUsage(TokenUsage usage) { + this.usage = usage; + } + + public int getChunkCount() { + return chunkCount; + } + + public void setChunkCount(int chunkCount) { + this.chunkCount = chunkCount; + } + } +} diff --git a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/integration/BedrockProducerIT.java b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/integration/BedrockProducerIT.java index ce51d3f7ef1d..6455c673a2b8 100644 --- a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/integration/BedrockProducerIT.java +++ b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/integration/BedrockProducerIT.java @@ -26,6 +26,7 @@ import org.apache.camel.component.aws2.bedrock.BedrockModels; import org.apache.camel.component.aws2.bedrock.runtime.BedrockConstants; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.condition.EnabledIfSystemProperties; @@ -45,6 +46,11 @@ class BedrockProducerIT extends CamelTestSupport { @EndpointInject("mock:result") private MockEndpoint result; + @BeforeEach + public void resetMocks() { + result.reset(); + } + @Test public void testInvokeTitanExpressModel() throws InterruptedException { @@ -867,6 +873,59 @@ class BedrockProducerIT extends CamelTestSupport { MockEndpoint.assertIsSatisfied(context); } + @Test + public void testConverseWithClaudeModel() throws InterruptedException { + result.expectedMessageCount(1); + final Exchange result = template.send("direct:converse_claude", exchange -> { + // Create a message using the Converse API + java.util.List<software.amazon.awssdk.services.bedrockruntime.model.Message> messages = new java.util.ArrayList<>(); + messages.add(software.amazon.awssdk.services.bedrockruntime.model.Message.builder() + .role(software.amazon.awssdk.services.bedrockruntime.model.ConversationRole.USER) + .content(software.amazon.awssdk.services.bedrockruntime.model.ContentBlock + .fromText("What is the capital of France?")) + .build()); + + exchange.getMessage().setHeader(BedrockConstants.CONVERSE_MESSAGES, messages); + + // Optional: Add inference configuration + software.amazon.awssdk.services.bedrockruntime.model.InferenceConfiguration inferenceConfig + = software.amazon.awssdk.services.bedrockruntime.model.InferenceConfiguration.builder() + .maxTokens(100) + .temperature(0.7f) + .build(); + exchange.getMessage().setHeader(BedrockConstants.CONVERSE_INFERENCE_CONFIG, inferenceConfig); + }); + + MockEndpoint.assertIsSatisfied(context); + } + + @Test + public void testConverseStreamWithClaudeModel() throws InterruptedException { + result.expectedMessageCount(1); + final Exchange result = template.send("direct:converse_stream_claude", exchange -> { + // Create a message using the Converse API + java.util.List<software.amazon.awssdk.services.bedrockruntime.model.Message> messages = new java.util.ArrayList<>(); + messages.add(software.amazon.awssdk.services.bedrockruntime.model.Message.builder() + .role(software.amazon.awssdk.services.bedrockruntime.model.ConversationRole.USER) + .content(software.amazon.awssdk.services.bedrockruntime.model.ContentBlock + .fromText("Tell me a short joke about Java programming")) + .build()); + + exchange.getMessage().setHeader(BedrockConstants.CONVERSE_MESSAGES, messages); + exchange.getMessage().setHeader(BedrockConstants.STREAM_OUTPUT_MODE, "complete"); + + // Optional: Add inference configuration + software.amazon.awssdk.services.bedrockruntime.model.InferenceConfiguration inferenceConfig + = software.amazon.awssdk.services.bedrockruntime.model.InferenceConfiguration.builder() + .maxTokens(200) + .temperature(0.9f) + .build(); + exchange.getMessage().setHeader(BedrockConstants.CONVERSE_INFERENCE_CONFIG, inferenceConfig); + }); + + MockEndpoint.assertIsSatisfied(context); + } + @Override protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { @@ -1057,6 +1116,18 @@ class BedrockProducerIT extends CamelTestSupport { + BedrockModels.MISTRAL_SMALL_2402.model) .log("Completions: ${body}") .to(result); + + from("direct:converse_claude") + .to("aws-bedrock:label?accessKey=RAW({{aws.manual.access.key}})&secretKey=RAW({{aws.manual.secret.key}})®ion=us-east-1&operation=converse&modelId=" + + BedrockModels.ANTROPHIC_CLAUDE_V3.model) + .log("Converse response: ${body}") + .to(result); + + from("direct:converse_stream_claude") + .to("aws-bedrock:label?accessKey=RAW({{aws.manual.access.key}})&secretKey=RAW({{aws.manual.secret.key}})®ion=us-east-1&operation=converseStream&modelId=" + + BedrockModels.ANTROPHIC_CLAUDE_V3.model) + .log("Converse stream response: ${body}") + .to(result); } }; } diff --git a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/integration/BedrockProducerStreamingIT.java b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/integration/BedrockProducerStreamingIT.java index 6efe1bc9a7db..62e189dbe5b8 100644 --- a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/integration/BedrockProducerStreamingIT.java +++ b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/integration/BedrockProducerStreamingIT.java @@ -61,10 +61,8 @@ class BedrockProducerStreamingIT extends CamelTestSupport { private MockEndpoint result; @BeforeEach - public void resetMocks() throws InterruptedException { + public void resetMocks() { result.reset(); - // Add small delay to avoid AWS rate limiting between tests - Thread.sleep(1000); } @Test diff --git a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/stream/ConverseStreamHandlerTest.java b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/stream/ConverseStreamHandlerTest.java new file mode 100644 index 000000000000..a07aa1a728ee --- /dev/null +++ b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/runtime/stream/ConverseStreamHandlerTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws2.bedrock.runtime.stream; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.bedrockruntime.model.ConverseStreamResponseHandler; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class ConverseStreamHandlerTest { + + @Test + void testCreateCompleteHandler() { + ConverseStreamHandler.StreamMetadata metadata = new ConverseStreamHandler.StreamMetadata(); + StringBuilder fullText = new StringBuilder(); + + ConverseStreamResponseHandler handler = ConverseStreamHandler.createCompleteHandler(metadata, fullText); + + assertNotNull(handler, "Handler should not be null"); + assertNotNull(metadata, "Metadata should not be null"); + assertNotNull(fullText, "Full text builder should not be null"); + } + + @Test + void testCreateChunksHandler() { + ConverseStreamHandler.StreamMetadata metadata = new ConverseStreamHandler.StreamMetadata(); + List<String> chunks = new ArrayList<>(); + + ConverseStreamResponseHandler handler + = ConverseStreamHandler.createChunksHandler(metadata, chunks, null); + + assertNotNull(handler, "Handler should not be null"); + assertNotNull(metadata, "Metadata should not be null"); + assertNotNull(chunks, "Chunks list should not be null"); + } + + @Test + void testStreamMetadata() { + ConverseStreamHandler.StreamMetadata metadata = new ConverseStreamHandler.StreamMetadata(); + + // Test setting and getting fullText + metadata.setFullText("Test response"); + assertEquals("Test response", metadata.getFullText()); + + // Test setting and getting chunks + List<String> chunks = List.of("chunk1", "chunk2"); + metadata.setChunks(chunks); + assertEquals(chunks, metadata.getChunks()); + + // Test setting and getting stopReason + metadata.setStopReason("end_turn"); + assertEquals("end_turn", metadata.getStopReason()); + + // Test setting and getting chunkCount + metadata.setChunkCount(5); + assertEquals(5, metadata.getChunkCount()); + + // Test usage is null initially + assertEquals(null, metadata.getUsage()); + } +} diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/BedrockEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/BedrockEndpointBuilderFactory.java index aca85d17480b..8cdb93811d2f 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/BedrockEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/BedrockEndpointBuilderFactory.java @@ -772,6 +772,106 @@ public interface BedrockEndpointBuilderFactory { public String awsBedrockChunkCount() { return "CamelAwsBedrockChunkCount"; } + /** + * The conversation messages for Converse API. + * + * The option is a: {@code List<Message>} type. + * + * Group: producer + * + * @return the name of the header {@code AwsBedrockConverseMessages}. + */ + public String awsBedrockConverseMessages() { + return "CamelAwsBedrockConverseMessages"; + } + /** + * The system prompts for Converse API. + * + * The option is a: {@code List<SystemContentBlock>} type. + * + * Group: producer + * + * @return the name of the header {@code AwsBedrockConverseSystem}. + */ + public String awsBedrockConverseSystem() { + return "CamelAwsBedrockConverseSystem"; + } + /** + * The inference configuration for Converse API. + * + * The option is a: {@code InferenceConfiguration} type. + * + * Group: producer + * + * @return the name of the header {@code + * AwsBedrockConverseInferenceConfig}. + */ + public String awsBedrockConverseInferenceConfig() { + return "CamelAwsBedrockConverseInferenceConfig"; + } + /** + * The tool configuration for Converse API. + * + * The option is a: {@code ToolConfiguration} type. + * + * Group: producer + * + * @return the name of the header {@code AwsBedrockConverseToolConfig}. + */ + public String awsBedrockConverseToolConfig() { + return "CamelAwsBedrockConverseToolConfig"; + } + /** + * The additional model request fields for Converse API. + * + * The option is a: {@code + * software.amazon.awssdk.core.document.Document} type. + * + * Group: producer + * + * @return the name of the header {@code + * AwsBedrockConverseAdditionalFields}. + */ + public String awsBedrockConverseAdditionalFields() { + return "CamelAwsBedrockConverseAdditionalFields"; + } + /** + * The stop reason from Converse API response. + * + * The option is a: {@code String} type. + * + * Group: producer + * + * @return the name of the header {@code AwsBedrockConverseStopReason}. + */ + public String awsBedrockConverseStopReason() { + return "CamelAwsBedrockConverseStopReason"; + } + /** + * The usage metrics from Converse API response. + * + * The option is a: {@code TokenUsage} type. + * + * Group: producer + * + * @return the name of the header {@code AwsBedrockConverseUsage}. + */ + public String awsBedrockConverseUsage() { + return "CamelAwsBedrockConverseUsage"; + } + /** + * The output message from Converse API response. + * + * The option is a: {@code Message} type. + * + * Group: producer + * + * @return the name of the header {@code + * AwsBedrockConverseOutputMessage}. + */ + public String awsBedrockConverseOutputMessage() { + return "CamelAwsBedrockConverseOutputMessage"; + } } static BedrockEndpointBuilder endpointBuilder(String componentName, String path) { class BedrockEndpointBuilderImpl extends AbstractEndpointBuilder implements BedrockEndpointBuilder, AdvancedBedrockEndpointBuilder {
