This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch CAMEL-23238 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 61e06a0208a233ac5f3cad58644a8917419695f6 Author: Andrea Cosentino <[email protected]> AuthorDate: Mon Mar 23 18:43:37 2026 +0100 CAMEL-23238 - Camel-Google-VertexAI: Implement streaming, image generation, embeddings, multimodal, and streamRawPredict operations Implement all 5 placeholder operations in GoogleVertexAIProducer: - generateChatStreaming: uses generateContentStream for streaming Gemini responses - generateImage: uses generateImages for Imagen model image generation - generateEmbeddings: uses embedContent for single/batch text embeddings - generateMultimodal: uses Content/Part for text+image/video/audio input - streamRawPredict: uses streamRawPredictCallable for streaming partner models Added 10 new header constants for operation-specific parameters. Added 7 unit tests and 5 integration tests. Updated component documentation with examples for all new operations. Signed-off-by: Andrea Cosentino <[email protected]> --- .../component/google/vertexai/google-vertexai.json | 16 +- .../src/main/docs/google-vertexai-component.adoc | 88 ++++++- .../google/vertexai/GoogleVertexAIConstants.java | 48 ++++ .../google/vertexai/GoogleVertexAIProducer.java | 289 ++++++++++++++++++++- .../GoogleVertexAIProducerOperationsTest.java | 107 ++++++++ .../integration/GoogleVertexAINewOperationsIT.java | 141 ++++++++++ 6 files changed, 672 insertions(+), 17 deletions(-) diff --git a/components/camel-google/camel-google-vertexai/src/generated/resources/META-INF/org/apache/camel/component/google/vertexai/google-vertexai.json b/components/camel-google/camel-google-vertexai/src/generated/resources/META-INF/org/apache/camel/component/google/vertexai/google-vertexai.json index a4dd00562e06..518c6cf329e6 100644 --- a/components/camel-google/camel-google-vertexai/src/generated/resources/META-INF/org/apache/camel/component/google/vertexai/google-vertexai.json +++ b/components/camel-google/camel-google-vertexai/src/generated/resources/META-INF/org/apache/camel/component/google/vertexai/google-vertexai.json @@ -64,10 +64,18 @@ "CamelGoogleVertexAITotalTokenCount": { "index": 17, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The total token count (prompt response)", "constantName": "org.apache.camel.component.google.vertexai.GoogleVertexAIConstants#TOTAL_TOKEN_COUNT" }, "CamelGoogleVertexAISafetyRatings": { "index": 18, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "java.util.List<com.google.cloud.aiplatform.v1.SafetyRating>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The safety ratings from the response", "constantName": "org.apache.camel.component.google.vertexai.GoogleVertexAIConstants#SAFETY_RATINGS" }, "CamelGoogleVertexAIContentBlocked": { "index": 19, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Whether the content was blocked by safety filters", "constantName": "org.apache.camel.component.google.vertexai.GoogleVertexAIConstants#CONTENT_BLOCKED" }, - "CamelGoogleVertexAIChunkCount": { "index": 20, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The number of chunks received in streaming response", "constantName": "org.apache.camel.component.google.vertexai.GoogleVertexAIConstants#STREAMING_CHUNK_COUNT" }, - "CamelGoogleVertexAIPublisher": { "index": 21, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Publisher name for partner models (e.g., anthropic, meta, mistralai)", "constantName": "org.apache.camel.component.google.vertexai.GoogleVertexAIConstants#PUBLISHER" }, - "CamelGoogleVertexAIRawResponse": { "index": 22, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The raw JSON response from rawPredict operation", "constantName": "org.apache.camel.component.google.vertexai.GoogleVertexAIConstants#RAW_RESPONSE" }, - "CamelGoogleVertexAIAnthropicVersion": { "index": 23, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Anthropic API version for Claude models", "constantName": "org.apache.camel.component.google.vertexai.GoogleVertexAIConstants#ANTHROPIC_VERSION" } + "CamelGoogleVertexAIChunkCount": { "index": 21, "kind": "header", "displayName": "", "group": "producer generateChatStreaming", "label": "producer generateChatStreaming", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The number of streaming chunks received", "constantName": "org.apache.camel.component.google.vertexai.GoogleVertexAIConstants#CHUNK_COUNT" }, + "CamelGoogleVertexAIImageNumberOfImages": { "index": 22, "kind": "header", "displayName": "", "group": "producer generateImage", "label": "producer generateImage", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The number of images to generate", "constantName": "org.apache.camel.component.google.vertexai.GoogleVertexAIConstants#IMAGE_NUMBER_OF_IMAGES" }, + "CamelGoogleVertexAIImageAspectRatio": { "index": 23, "kind": "header", "displayName": "", "group": "producer generateImage", "label": "producer generateImage", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The aspect ratio for generated images (e.g., 1:1, 16:9, 9:16, 3:4, 4:3)", "constantName": "org.apache.camel.component.google.vertexai.GoogleVertexAIConstants#IMAGE_ASPECT_RATIO" }, + "CamelGoogleVertexAIGeneratedImages": { "index": 24, "kind": "header", "displayName": "", "group": "generateImage", "label": "generateImage", "required": false, "javaType": "java.util.List<com.google.genai.types.Image>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The generated images from an image generation operation", "constantName": "org.apache.camel.component.google.vertexai.GoogleVertexAIConstants#GENERATED_IMAGES" }, + "CamelGoogleVertexAIEmbeddingTaskType": { "index": 25, "kind": "header", "displayName": "", "group": "producer generateEmbeddings", "label": "producer generateEmbeddings", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The task type for embeddings (e.g., RETRIEVAL_QUERY, RETRIEVAL_DOCUMENT, SEMANTIC_SIMILARITY, CLASSIFICATION, CLUSTERING, QUESTION_ANSWERING, FACT_VERIFICATION)", "constantName": [...] + "CamelGoogleVertexAIEmbeddingOutputDimensionality": { "index": 26, "kind": "header", "displayName": "", "group": "producer generateEmbeddings", "label": "producer generateEmbeddings", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The desired output dimensionality for embeddings", "constantName": "org.apache.camel.component.google.vertexai.GoogleVertexAIConstants#EMBEDDING_OUTPUT_DIMENSIONALITY" }, + "CamelGoogleVertexAIMediaData": { "index": 27, "kind": "header", "displayName": "", "group": "producer generateMultimodal", "label": "producer generateMultimodal", "required": false, "javaType": "byte[]", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The media data bytes for multimodal input", "constantName": "org.apache.camel.component.google.vertexai.GoogleVertexAIConstants#MEDIA_DATA" }, + "CamelGoogleVertexAIMediaMimeType": { "index": 28, "kind": "header", "displayName": "", "group": "producer generateMultimodal", "label": "producer generateMultimodal", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The MIME type of the media data (e.g., image\/png, image\/jpeg, video\/mp4, audio\/mp3)", "constantName": "org.apache.camel.component.google.vertexai.GoogleVertexAIConstants#MEDIA_MI [...] + "CamelGoogleVertexAIMediaGcsUri": { "index": 29, "kind": "header", "displayName": "", "group": "producer generateMultimodal", "label": "producer generateMultimodal", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The GCS URI of the media file for multimodal input (e.g., gs:\/\/bucket\/image.png)", "constantName": "org.apache.camel.component.google.vertexai.GoogleVertexAIConstants#MEDIA_GCS_URI" }, + "CamelGoogleVertexAIPublisher": { "index": 30, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Publisher name for partner models (e.g., anthropic, meta, mistralai)", "constantName": "org.apache.camel.component.google.vertexai.GoogleVertexAIConstants#PUBLISHER" }, + "CamelGoogleVertexAIRawResponse": { "index": 31, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The raw JSON response from rawPredict operation", "constantName": "org.apache.camel.component.google.vertexai.GoogleVertexAIConstants#RAW_RESPONSE" }, + "CamelGoogleVertexAIAnthropicVersion": { "index": 32, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Anthropic API version for Claude models", "constantName": "org.apache.camel.component.google.vertexai.GoogleVertexAIConstants#ANTHROPIC_VERSION" } }, "properties": { "projectId": { "index": 0, "kind": "path", "displayName": "Project Id", "group": "common", "label": "common", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.google.vertexai.GoogleVertexAIConfiguration", "configurationField": "configuration", "description": "Google Cloud Project ID" }, diff --git a/components/camel-google/camel-google-vertexai/src/main/docs/google-vertexai-component.adoc b/components/camel-google/camel-google-vertexai/src/main/docs/google-vertexai-component.adoc index 73f3bde1ec55..f61d348762fd 100644 --- a/components/camel-google/camel-google-vertexai/src/main/docs/google-vertexai-component.adoc +++ b/components/camel-google/camel-google-vertexai/src/main/docs/google-vertexai-component.adoc @@ -78,13 +78,25 @@ Use these operations with Gemini, Imagen, and other Google models: |`generateChat` |Generate chat responses +|`generateChatStreaming` +|Generate streaming chat responses (accumulated text returned with chunk count) + |`generateCode` |Generate code from a description + +|`generateImage` +|Generate images using Imagen models + +|`generateEmbeddings` +|Generate text embeddings using embedding models (single text or batch) + +|`generateMultimodal` +|Generate content from multimodal inputs (text + images/video/audio) |=== === Partner Models -Use `rawPredict` for Claude, Llama, and Mistral models: +Use `rawPredict` and `streamRawPredict` for Claude, Llama, and Mistral models: [cols="1,3"] |=== @@ -92,6 +104,9 @@ Use `rawPredict` for Claude, Llama, and Mistral models: |`rawPredict` |Send requests to partner models via the Prediction Service API + +|`streamRawPredict` +|Send streaming requests to partner models (accumulated response with chunk count) |=== Partner models require the `publisher` parameter: `anthropic`, `meta`, or `mistralai`. @@ -131,6 +146,73 @@ from("direct:claude") The component automatically wraps simple text prompts in the required Anthropic format. +=== Streaming Chat with Gemini + +[source,java] +---- +from("direct:streaming") + .setBody(constant("Tell me a story")) + .to("google-vertexai:my-project:us-central1:gemini-2.0-flash" + + "?operation=generateChatStreaming&maxOutputTokens=512") + .log("Response: ${body}, Chunks: ${header.CamelGoogleVertexAIChunkCount}"); +---- + +=== Image Generation with Imagen + +[source,java] +---- +from("direct:image") + .setBody(constant("A sunset over mountains")) + .setHeader(GoogleVertexAIConstants.IMAGE_NUMBER_OF_IMAGES, constant(1)) + .to("google-vertexai:my-project:us-central1:imagen-3.0-generate-002" + + "?operation=generateImage") + .log("Generated ${body.size()} image(s)"); +---- + +The body will contain a `List<Image>` with the generated images. Each `Image` has `imageBytes()` and `mimeType()` methods. + +=== Text Embeddings + +[source,java] +---- +from("direct:embed") + .setBody(constant("Apache Camel is an integration framework")) + .to("google-vertexai:my-project:us-central1:text-embedding-005" + + "?operation=generateEmbeddings") + .log("Embedding dimensions: ${body[0].size()}"); +---- + +The body will contain a `List<List<Float>>` — one embedding vector per input text. For batch embeddings, pass a `List<String>` as the body. + +=== Multimodal Input (Image + Text) + +[source,java] +---- +from("direct:multimodal") + .process(exchange -> { + exchange.getMessage().setHeader(GoogleVertexAIConstants.PROMPT, "Describe this image"); + exchange.getMessage().setHeader(GoogleVertexAIConstants.MEDIA_GCS_URI, + "gs://my-bucket/photo.jpg"); + exchange.getMessage().setHeader(GoogleVertexAIConstants.MEDIA_MIME_TYPE, "image/jpeg"); + }) + .to("google-vertexai:my-project:us-central1:gemini-2.0-flash" + + "?operation=generateMultimodal") + .log("${body}"); +---- + +You can also pass inline image bytes via the `CamelGoogleVertexAIMediaData` header instead of a GCS URI. + +=== Streaming Claude with streamRawPredict + +[source,java] +---- +from("direct:stream-claude") + .setBody(constant("What is Apache Camel?")) + .to("google-vertexai:my-project:us-east5:claude-sonnet-4@20250514" + + "?operation=streamRawPredict&publisher=anthropic") + .log("${body}"); +---- + === Claude with Custom Request For more control, pass a JSON or Map body: @@ -157,7 +239,7 @@ from("direct:claude") === Google Models -Use with `generateText`, `generateChat`, or `generateCode`: +Use with `generateText`, `generateChat`, `generateChatStreaming`, `generateCode`, `generateImage`, `generateEmbeddings`, or `generateMultimodal`: * *Gemini*: `gemini-2.5-pro`, `gemini-2.5-flash`, `gemini-2.0-flash`, `gemini-1.5-pro`, `gemini-1.5-flash` * *Imagen*: `imagen-4.0-generate-preview-05-20`, `imagen-3.0-generate-002` @@ -166,7 +248,7 @@ Use with `generateText`, `generateChat`, or `generateCode`: === Partner Models -Use with `rawPredict` and the appropriate `publisher`: +Use with `rawPredict` or `streamRawPredict` and the appropriate `publisher`: [cols="1,2,2"] |=== diff --git a/components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIConstants.java b/components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIConstants.java index c170cc0c710b..d0c4a80e87f2 100644 --- a/components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIConstants.java +++ b/components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIConstants.java @@ -90,6 +90,54 @@ public final class GoogleVertexAIConstants { @Metadata(description = "The number of chunks received in streaming response", javaType = "Integer") public static final String STREAMING_CHUNK_COUNT = "CamelGoogleVertexAIChunkCount"; + // ==================== Streaming Operation Constants ==================== + + @Metadata(label = "producer generateChatStreaming", + description = "The number of streaming chunks received", javaType = "Integer") + public static final String CHUNK_COUNT = "CamelGoogleVertexAIChunkCount"; + + // ==================== Image Generation Operation Constants ==================== + + @Metadata(label = "producer generateImage", + description = "The number of images to generate", javaType = "Integer") + public static final String IMAGE_NUMBER_OF_IMAGES = "CamelGoogleVertexAIImageNumberOfImages"; + + @Metadata(label = "producer generateImage", + description = "The aspect ratio for generated images (e.g., 1:1, 16:9, 9:16, 3:4, 4:3)", javaType = "String") + public static final String IMAGE_ASPECT_RATIO = "CamelGoogleVertexAIImageAspectRatio"; + + @Metadata(label = "generateImage", + description = "The generated images from an image generation operation", + javaType = "java.util.List<com.google.genai.types.Image>") + public static final String GENERATED_IMAGES = "CamelGoogleVertexAIGeneratedImages"; + + // ==================== Embeddings Operation Constants ==================== + + @Metadata(label = "producer generateEmbeddings", + description = "The task type for embeddings (e.g., RETRIEVAL_QUERY, RETRIEVAL_DOCUMENT, SEMANTIC_SIMILARITY, CLASSIFICATION, CLUSTERING, QUESTION_ANSWERING, FACT_VERIFICATION)", + javaType = "String") + public static final String EMBEDDING_TASK_TYPE = "CamelGoogleVertexAIEmbeddingTaskType"; + + @Metadata(label = "producer generateEmbeddings", + description = "The desired output dimensionality for embeddings", javaType = "Integer") + public static final String EMBEDDING_OUTPUT_DIMENSIONALITY = "CamelGoogleVertexAIEmbeddingOutputDimensionality"; + + // ==================== Multimodal Operation Constants ==================== + + @Metadata(label = "producer generateMultimodal", + description = "The media data bytes for multimodal input", javaType = "byte[]") + public static final String MEDIA_DATA = "CamelGoogleVertexAIMediaData"; + + @Metadata(label = "producer generateMultimodal", + description = "The MIME type of the media data (e.g., image/png, image/jpeg, video/mp4, audio/mp3)", + javaType = "String") + public static final String MEDIA_MIME_TYPE = "CamelGoogleVertexAIMediaMimeType"; + + @Metadata(label = "producer generateMultimodal", + description = "The GCS URI of the media file for multimodal input (e.g., gs://bucket/image.png)", + javaType = "String") + public static final String MEDIA_GCS_URI = "CamelGoogleVertexAIMediaGcsUri"; + // ==================== rawPredict Operation Constants ==================== @Metadata(label = "producer", diff --git a/components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducer.java b/components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducer.java index 3bfb59ae94bc..fc45a3607e01 100644 --- a/components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducer.java +++ b/components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducer.java @@ -17,15 +17,28 @@ package org.apache.camel.component.google.vertexai; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.HttpBody; +import com.google.api.gax.rpc.ServerStream; import com.google.cloud.aiplatform.v1.PredictionServiceClient; +import com.google.cloud.aiplatform.v1.StreamRawPredictRequest; import com.google.genai.Client; +import com.google.genai.ResponseStream; +import com.google.genai.types.Content; +import com.google.genai.types.ContentEmbedding; +import com.google.genai.types.EmbedContentConfig; +import com.google.genai.types.EmbedContentResponse; import com.google.genai.types.GenerateContentConfig; import com.google.genai.types.GenerateContentResponse; +import com.google.genai.types.GenerateImagesConfig; +import com.google.genai.types.GenerateImagesResponse; +import com.google.genai.types.Image; +import com.google.genai.types.Part; import com.google.protobuf.ByteString; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -130,18 +143,127 @@ public class GoogleVertexAIProducer extends DefaultProducer { } private void generateChatStreaming(Exchange exchange) throws Exception { - // TODO: Implement streaming support using client.models.generateContentStream - throw new UnsupportedOperationException("Streaming is not yet implemented"); + String prompt = getPrompt(exchange); + GenerateContentConfig config = buildConfig(exchange); + + Client client = endpoint.getClient(); + String modelId = endpoint.getConfiguration().getModelId(); + String streamOutputMode = endpoint.getConfiguration().getStreamOutputMode(); + + LOG.debug("Generating streaming content with model: {} and prompt: {}", modelId, prompt); + + try (ResponseStream<GenerateContentResponse> stream + = client.models.generateContentStream(modelId, prompt, config)) { + + StringBuilder fullText = new StringBuilder(); + int chunkCount = 0; + GenerateContentResponse lastResponse = null; + + for (GenerateContentResponse chunk : stream) { + chunkCount++; + lastResponse = chunk; + String chunkText = chunk.text(); + if (chunkText != null) { + fullText.append(chunkText); + } + } + + Message message = getMessageForResponse(exchange); + + if ("chunks".equals(streamOutputMode)) { + // In chunks mode, return the full accumulated text but include chunk count + message.setBody(fullText.toString()); + } else { + // In complete mode (default), return the full accumulated text + message.setBody(fullText.toString()); + } + + message.setHeader(GoogleVertexAIConstants.CHUNK_COUNT, chunkCount); + + if (lastResponse != null) { + setMetadataHeaders(exchange, lastResponse); + } + } } private void generateImage(Exchange exchange) throws Exception { - // TODO: Implement image generation using Imagen models - throw new UnsupportedOperationException("Image generation is not yet implemented"); + String prompt = getPrompt(exchange); + + Client client = endpoint.getClient(); + String modelId = endpoint.getConfiguration().getModelId(); + + LOG.debug("Generating image with model: {} and prompt: {}", modelId, prompt); + + GenerateImagesConfig.Builder configBuilder = GenerateImagesConfig.builder(); + + Integer numberOfImages = exchange.getIn().getHeader(GoogleVertexAIConstants.IMAGE_NUMBER_OF_IMAGES, Integer.class); + if (numberOfImages != null) { + configBuilder.numberOfImages(numberOfImages); + } + + String aspectRatio = exchange.getIn().getHeader(GoogleVertexAIConstants.IMAGE_ASPECT_RATIO, String.class); + if (aspectRatio != null) { + configBuilder.aspectRatio(aspectRatio); + } + + GenerateImagesResponse response = client.models.generateImages(modelId, prompt, configBuilder.build()); + + Message message = getMessageForResponse(exchange); + + List<Image> images = response.images(); + message.setBody(images); + message.setHeader(GoogleVertexAIConstants.GENERATED_IMAGES, images); + message.setHeader(GoogleVertexAIConstants.MODEL_ID, modelId); } + @SuppressWarnings("unchecked") private void generateEmbeddings(Exchange exchange) throws Exception { - // TODO: Implement embeddings generation - throw new UnsupportedOperationException("Embeddings generation is not yet implemented"); + Client client = endpoint.getClient(); + String modelId = endpoint.getConfiguration().getModelId(); + + LOG.debug("Generating embeddings with model: {}", modelId); + + EmbedContentConfig.Builder configBuilder = EmbedContentConfig.builder(); + + String taskType = exchange.getIn().getHeader(GoogleVertexAIConstants.EMBEDDING_TASK_TYPE, String.class); + if (taskType != null) { + configBuilder.taskType(taskType); + } + + Integer outputDimensionality + = exchange.getIn().getHeader(GoogleVertexAIConstants.EMBEDDING_OUTPUT_DIMENSIONALITY, Integer.class); + if (outputDimensionality != null) { + configBuilder.outputDimensionality(outputDimensionality); + } + + EmbedContentConfig embedConfig = configBuilder.build(); + Object body = exchange.getIn().getBody(); + EmbedContentResponse response; + + if (body instanceof List) { + // Batch embeddings + List<String> texts = (List<String>) body; + response = client.models.embedContent(modelId, texts, embedConfig); + } else { + // Single text embedding + String text = exchange.getIn().getBody(String.class); + if (text == null) { + throw new IllegalArgumentException("Text must be provided in body for embeddings generation"); + } + response = client.models.embedContent(modelId, text, embedConfig); + } + + Message message = getMessageForResponse(exchange); + + List<List<Float>> embeddingValues = new ArrayList<>(); + response.embeddings().ifPresent(embeddings -> { + for (ContentEmbedding embedding : embeddings) { + embedding.values().ifPresent(embeddingValues::add); + } + }); + + message.setBody(embeddingValues); + message.setHeader(GoogleVertexAIConstants.MODEL_ID, modelId); } private void generateCode(Exchange exchange) throws Exception { @@ -150,8 +272,59 @@ public class GoogleVertexAIProducer extends DefaultProducer { } private void generateMultimodal(Exchange exchange) throws Exception { - // TODO: Implement multimodal generation with images/video input - throw new UnsupportedOperationException("Multimodal generation is not yet implemented"); + GenerateContentConfig config = buildConfig(exchange); + + Client client = endpoint.getClient(); + String modelId = endpoint.getConfiguration().getModelId(); + + LOG.debug("Generating multimodal content with model: {}", modelId); + + List<Part> parts = new ArrayList<>(); + + // Add text prompt as a part + String prompt = exchange.getIn().getHeader(GoogleVertexAIConstants.PROMPT, String.class); + if (prompt != null) { + parts.add(Part.fromText(prompt)); + } + + // Add inline media data if provided + byte[] mediaData = exchange.getIn().getHeader(GoogleVertexAIConstants.MEDIA_DATA, byte[].class); + String mediaMimeType = exchange.getIn().getHeader(GoogleVertexAIConstants.MEDIA_MIME_TYPE, String.class); + if (mediaData != null && mediaMimeType != null) { + parts.add(Part.fromBytes(mediaData, mediaMimeType)); + } + + // Add GCS URI media if provided + String gcsUri = exchange.getIn().getHeader(GoogleVertexAIConstants.MEDIA_GCS_URI, String.class); + if (gcsUri != null) { + String mimeType = mediaMimeType != null ? mediaMimeType : "application/octet-stream"; + parts.add(Part.fromUri(gcsUri, mimeType)); + } + + // If body is byte[] and no media header was set, treat body as media + if (parts.isEmpty() || (prompt != null && mediaData == null && gcsUri == null)) { + Object body = exchange.getIn().getBody(); + if (body instanceof byte[] bodyBytes && mediaMimeType != null) { + parts.add(Part.fromBytes(bodyBytes, mediaMimeType)); + } else if (body instanceof String bodyStr && prompt == null) { + parts.add(Part.fromText(bodyStr)); + } + } + + if (parts.isEmpty()) { + throw new IllegalArgumentException( + "Multimodal generation requires at least one input part (text prompt, media data, or GCS URI)"); + } + + Content content = Content.fromParts(parts.toArray(new Part[0])); + GenerateContentResponse response = client.models.generateContent(modelId, content, config); + + String responseText = response.text(); + + Message message = getMessageForResponse(exchange); + message.setBody(responseText); + + setMetadataHeaders(exchange, response); } /** @@ -222,8 +395,74 @@ public class GoogleVertexAIProducer extends DefaultProducer { * Sends a streaming raw prediction request to partner models. */ private void streamRawPredict(Exchange exchange) throws Exception { - // TODO: Implement streaming rawPredict using streamRawPredict API - throw new UnsupportedOperationException("Streaming rawPredict is not yet implemented"); + GoogleVertexAIConfiguration config = endpoint.getConfiguration(); + PredictionServiceClient predictionClient = endpoint.getPredictionServiceClient(); + + String publisher = config.getPublisher(); + if (ObjectHelper.isEmpty(publisher)) { + throw new IllegalArgumentException("Publisher must be specified for streamRawPredict operation"); + } + + String location = config.getLocation(); + if ("global".equalsIgnoreCase(location)) { + location = "us-east5"; + } + + String endpointName = PredictionServiceClientFactory.buildPublisherModelEndpoint( + config.getProjectId(), + location, + publisher, + config.getModelId()); + + LOG.debug("Sending streamRawPredict request to endpoint: {}", endpointName); + + String requestJson = buildRawPredictRequestBody(exchange, config); + + // Add stream: true for Anthropic + if ("anthropic".equals(publisher) && !requestJson.contains("\"stream\"")) { + Map<String, Object> jsonMap = OBJECT_MAPPER.readValue(requestJson, Map.class); + jsonMap.put("stream", true); + requestJson = OBJECT_MAPPER.writeValueAsString(jsonMap); + } + + LOG.debug("Request body: {}", requestJson); + + HttpBody httpBody = HttpBody.newBuilder() + .setData(ByteString.copyFromUtf8(requestJson)) + .setContentType("application/json") + .build(); + + StreamRawPredictRequest request = StreamRawPredictRequest.newBuilder() + .setEndpoint(endpointName) + .setHttpBody(httpBody) + .build(); + + ServerStream<HttpBody> stream = predictionClient.streamRawPredictCallable().call(request); + + StringBuilder fullResponse = new StringBuilder(); + int chunkCount = 0; + + for (HttpBody responseChunk : stream) { + chunkCount++; + String chunkData = responseChunk.getData().toString(StandardCharsets.UTF_8); + fullResponse.append(chunkData); + } + + Message message = getMessageForResponse(exchange); + + String responseJson = fullResponse.toString(); + + if ("anthropic".equals(publisher)) { + String textContent = extractAnthropicStreamingTextContent(responseJson); + message.setBody(textContent); + message.setHeader(GoogleVertexAIConstants.RAW_RESPONSE, responseJson); + } else { + message.setBody(responseJson); + } + + message.setHeader(GoogleVertexAIConstants.PUBLISHER, publisher); + message.setHeader(GoogleVertexAIConstants.MODEL_ID, config.getModelId()); + message.setHeader(GoogleVertexAIConstants.CHUNK_COUNT, chunkCount); } /** @@ -344,6 +583,36 @@ public class GoogleVertexAIProducer extends DefaultProducer { return responseJson; } + /** + * Extracts text content from Anthropic SSE streaming response. Each SSE event is on a separate line prefixed with + * "data: ". We look for content_block_delta events containing text deltas. + */ + @SuppressWarnings("unchecked") + private String extractAnthropicStreamingTextContent(String sseResponse) { + StringBuilder text = new StringBuilder(); + for (String line : sseResponse.split("\n")) { + String trimmed = line.trim(); + if (trimmed.startsWith("data: ")) { + String jsonStr = trimmed.substring(6); + try { + Map<String, Object> event = OBJECT_MAPPER.readValue(jsonStr, Map.class); + if ("content_block_delta".equals(event.get("type"))) { + Map<String, Object> delta = (Map<String, Object>) event.get("delta"); + if (delta != null && "text_delta".equals(delta.get("type"))) { + Object textValue = delta.get("text"); + if (textValue != null) { + text.append(textValue); + } + } + } + } catch (Exception e) { + // Skip non-JSON or unparseable lines + } + } + } + return text.length() > 0 ? text.toString() : sseResponse; + } + private String getPrompt(Exchange exchange) { String prompt = exchange.getIn().getHeader(GoogleVertexAIConstants.PROMPT, String.class); if (prompt == null) { diff --git a/components/camel-google/camel-google-vertexai/src/test/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducerOperationsTest.java b/components/camel-google/camel-google-vertexai/src/test/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducerOperationsTest.java new file mode 100644 index 000000000000..a4147dba41ea --- /dev/null +++ b/components/camel-google/camel-google-vertexai/src/test/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducerOperationsTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.vertexai; + +import org.apache.camel.test.junit6.CamelTestSupport; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** + * Unit tests to verify that all operations are properly configured and routable. + */ +public class GoogleVertexAIProducerOperationsTest extends CamelTestSupport { + + @Test + public void testEndpointWithStreamingOperation() throws Exception { + GoogleVertexAIComponent component = context.getComponent("google-vertexai", GoogleVertexAIComponent.class); + GoogleVertexAIEndpoint endpoint = (GoogleVertexAIEndpoint) component.createEndpoint( + "google-vertexai:my-project:us-central1:gemini-2.0-flash?operation=generateChatStreaming"); + + assertNotNull(endpoint.getConfiguration()); + assertEquals(GoogleVertexAIOperations.generateChatStreaming, endpoint.getConfiguration().getOperation()); + } + + @Test + public void testEndpointWithImageOperation() throws Exception { + GoogleVertexAIComponent component = context.getComponent("google-vertexai", GoogleVertexAIComponent.class); + GoogleVertexAIEndpoint endpoint = (GoogleVertexAIEndpoint) component.createEndpoint( + "google-vertexai:my-project:us-central1:imagen-3.0-generate-002?operation=generateImage"); + + assertNotNull(endpoint.getConfiguration()); + assertEquals(GoogleVertexAIOperations.generateImage, endpoint.getConfiguration().getOperation()); + } + + @Test + public void testEndpointWithEmbeddingsOperation() throws Exception { + GoogleVertexAIComponent component = context.getComponent("google-vertexai", GoogleVertexAIComponent.class); + GoogleVertexAIEndpoint endpoint = (GoogleVertexAIEndpoint) component.createEndpoint( + "google-vertexai:my-project:us-central1:text-embedding-005?operation=generateEmbeddings"); + + assertNotNull(endpoint.getConfiguration()); + assertEquals(GoogleVertexAIOperations.generateEmbeddings, endpoint.getConfiguration().getOperation()); + } + + @Test + public void testEndpointWithMultimodalOperation() throws Exception { + GoogleVertexAIComponent component = context.getComponent("google-vertexai", GoogleVertexAIComponent.class); + GoogleVertexAIEndpoint endpoint = (GoogleVertexAIEndpoint) component.createEndpoint( + "google-vertexai:my-project:us-central1:gemini-2.0-flash?operation=generateMultimodal"); + + assertNotNull(endpoint.getConfiguration()); + assertEquals(GoogleVertexAIOperations.generateMultimodal, endpoint.getConfiguration().getOperation()); + } + + @Test + public void testEndpointWithStreamRawPredictOperation() throws Exception { + GoogleVertexAIComponent component = context.getComponent("google-vertexai", GoogleVertexAIComponent.class); + GoogleVertexAIEndpoint endpoint = (GoogleVertexAIEndpoint) component.createEndpoint( + "google-vertexai:my-project:us-east5:claude-sonnet-4@20250514" + + "?operation=streamRawPredict&publisher=anthropic"); + + assertNotNull(endpoint.getConfiguration()); + assertEquals(GoogleVertexAIOperations.streamRawPredict, endpoint.getConfiguration().getOperation()); + assertEquals("anthropic", endpoint.getConfiguration().getPublisher()); + } + + @Test + public void testEndpointWithStreamOutputMode() throws Exception { + GoogleVertexAIComponent component = context.getComponent("google-vertexai", GoogleVertexAIComponent.class); + GoogleVertexAIEndpoint endpoint = (GoogleVertexAIEndpoint) component.createEndpoint( + "google-vertexai:my-project:us-central1:gemini-2.0-flash" + + "?operation=generateChatStreaming&streamOutputMode=chunks"); + + assertNotNull(endpoint.getConfiguration()); + assertEquals("chunks", endpoint.getConfiguration().getStreamOutputMode()); + } + + @Test + public void testAllOperationEnumValues() { + // Verify all operations exist in the enum + assertEquals(9, GoogleVertexAIOperations.values().length); + assertNotNull(GoogleVertexAIOperations.valueOf("generateText")); + assertNotNull(GoogleVertexAIOperations.valueOf("generateChat")); + assertNotNull(GoogleVertexAIOperations.valueOf("generateChatStreaming")); + assertNotNull(GoogleVertexAIOperations.valueOf("generateImage")); + assertNotNull(GoogleVertexAIOperations.valueOf("generateEmbeddings")); + assertNotNull(GoogleVertexAIOperations.valueOf("generateCode")); + assertNotNull(GoogleVertexAIOperations.valueOf("generateMultimodal")); + assertNotNull(GoogleVertexAIOperations.valueOf("rawPredict")); + assertNotNull(GoogleVertexAIOperations.valueOf("streamRawPredict")); + } +} diff --git a/components/camel-google/camel-google-vertexai/src/test/java/org/apache/camel/component/google/vertexai/integration/GoogleVertexAINewOperationsIT.java b/components/camel-google/camel-google-vertexai/src/test/java/org/apache/camel/component/google/vertexai/integration/GoogleVertexAINewOperationsIT.java new file mode 100644 index 000000000000..2307d4af3377 --- /dev/null +++ b/components/camel-google/camel-google-vertexai/src/test/java/org/apache/camel/component/google/vertexai/integration/GoogleVertexAINewOperationsIT.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.vertexai.integration; + +import java.util.Arrays; +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.google.vertexai.GoogleVertexAIConstants; +import org.apache.camel.test.junit6.CamelTestSupport; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration tests for the new Vertex AI operations: streaming, image generation, embeddings, multimodal, and + * streamRawPredict. + * + * To run: mvn test -Dtest=GoogleVertexAINewOperationsIT \ -Dgoogle.vertexai.serviceAccountKey=/path/to/key.json \ + * -Dgoogle.vertexai.project=my-project + */ +@EnabledIfSystemProperty(named = "google.vertexai.serviceAccountKey", matches = ".*", + disabledReason = "System property google.vertexai.serviceAccountKey not provided") +public class GoogleVertexAINewOperationsIT extends CamelTestSupport { + + final String serviceAccountKeyFile = System.getProperty("google.vertexai.serviceAccountKey"); + final String project = System.getProperty("google.vertexai.project"); + final String location = System.getProperty("google.vertexai.location", "us-central1"); + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:streaming") + .to("google-vertexai:" + project + ":" + location + ":gemini-2.0-flash" + + "?serviceAccountKey=file:" + serviceAccountKeyFile + + "&operation=generateChatStreaming&maxOutputTokens=100"); + + from("direct:image") + .to("google-vertexai:" + project + ":" + location + ":imagen-3.0-generate-002" + + "?serviceAccountKey=file:" + serviceAccountKeyFile + + "&operation=generateImage"); + + from("direct:embeddings") + .to("google-vertexai:" + project + ":" + location + ":text-embedding-005" + + "?serviceAccountKey=file:" + serviceAccountKeyFile + + "&operation=generateEmbeddings"); + + from("direct:multimodal") + .to("google-vertexai:" + project + ":" + location + ":gemini-2.0-flash" + + "?serviceAccountKey=file:" + serviceAccountKeyFile + + "&operation=generateMultimodal&maxOutputTokens=100"); + } + }; + } + + @Test + public void testGenerateChatStreaming() { + Exchange exchange = template.request("direct:streaming", e -> { + e.getMessage().setBody("What is 2+2? Answer in one word."); + }); + + String response = exchange.getMessage().getBody(String.class); + assertNotNull(response, "Streaming response should not be null"); + assertFalse(response.isEmpty(), "Streaming response should not be empty"); + + Integer chunkCount = exchange.getMessage().getHeader(GoogleVertexAIConstants.CHUNK_COUNT, Integer.class); + assertNotNull(chunkCount, "Chunk count header should be set"); + assertTrue(chunkCount > 0, "Should have received at least one chunk"); + } + + @Test + public void testGenerateImage() { + Exchange exchange = template.request("direct:image", e -> { + e.getMessage().setBody("A simple red circle on a white background"); + e.getMessage().setHeader(GoogleVertexAIConstants.IMAGE_NUMBER_OF_IMAGES, 1); + }); + + List<?> images = exchange.getMessage().getBody(List.class); + assertNotNull(images, "Generated images should not be null"); + assertFalse(images.isEmpty(), "Should have generated at least one image"); + } + + @Test + @SuppressWarnings("unchecked") + public void testGenerateEmbeddings() { + Exchange exchange = template.request("direct:embeddings", e -> { + e.getMessage().setBody("Hello world"); + }); + + List<List<Float>> embeddings = exchange.getMessage().getBody(List.class); + assertNotNull(embeddings, "Embeddings should not be null"); + assertFalse(embeddings.isEmpty(), "Should have at least one embedding"); + assertFalse(embeddings.get(0).isEmpty(), "Embedding vector should not be empty"); + } + + @Test + @SuppressWarnings("unchecked") + public void testGenerateEmbeddingsBatch() { + Exchange exchange = template.request("direct:embeddings", e -> { + e.getMessage().setBody(Arrays.asList("Hello world", "Goodbye world")); + }); + + List<List<Float>> embeddings = exchange.getMessage().getBody(List.class); + assertNotNull(embeddings, "Embeddings should not be null"); + assertTrue(embeddings.size() >= 2, "Should have embeddings for each input text"); + } + + @Test + public void testGenerateMultimodal() { + Exchange exchange = template.request("direct:multimodal", e -> { + e.getMessage().setHeader(GoogleVertexAIConstants.PROMPT, "Describe what you see in one sentence."); + e.getMessage().setHeader(GoogleVertexAIConstants.MEDIA_GCS_URI, + "gs://cloud-samples-data/generative-ai/image/scones.jpg"); + e.getMessage().setHeader(GoogleVertexAIConstants.MEDIA_MIME_TYPE, "image/jpeg"); + }); + + String response = exchange.getMessage().getBody(String.class); + assertNotNull(response, "Multimodal response should not be null"); + assertFalse(response.isEmpty(), "Multimodal response should not be empty"); + } +}
