gnodet commented on code in PR #22220:
URL: https://github.com/apache/camel/pull/22220#discussion_r2980251430


##########
components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIConstants.java:
##########
@@ -90,6 +90,54 @@ public final class GoogleVertexAIConstants {
     @Metadata(description = "The number of chunks received in streaming 
response", javaType = "Integer")
     public static final String STREAMING_CHUNK_COUNT = 
"CamelGoogleVertexAIChunkCount";
 
+    // ==================== Streaming Operation Constants ====================
+
+    @Metadata(label = "producer generateChatStreaming",
+              description = "The number of streaming chunks received", 
javaType = "Integer")
+    public static final String CHUNK_COUNT = "CamelGoogleVertexAIChunkCount";
+

Review Comment:
   _Claude Code on behalf of Guillaume Nodet_
   
   This new `CHUNK_COUNT` constant has the same header value 
(`CamelGoogleVertexAIChunkCount`) as the existing `STREAMING_CHUNK_COUNT` on 
line 91 (visible just above in this diff). This creates a duplicate constant 
for the same header name, which causes issues in the generated metadata (index 
20 is skipped in the generated JSON).
   
   Since `STREAMING_CHUNK_COUNT` was introduced in the same development cycle 
and hasn't been released yet, I'd suggest removing `STREAMING_CHUNK_COUNT` 
entirely and keeping only `CHUNK_COUNT`. If it was already released, deprecate 
`STREAMING_CHUNK_COUNT` and point to `CHUNK_COUNT`.



##########
components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIConstants.java:
##########
@@ -90,6 +90,54 @@ public final class GoogleVertexAIConstants {
     @Metadata(description = "The number of chunks received in streaming 
response", javaType = "Integer")
     public static final String STREAMING_CHUNK_COUNT = 
"CamelGoogleVertexAIChunkCount";
 
+    // ==================== Streaming Operation Constants ====================
+
+    @Metadata(label = "producer generateChatStreaming",
+              description = "The number of streaming chunks received", 
javaType = "Integer")
+    public static final String CHUNK_COUNT = "CamelGoogleVertexAIChunkCount";
+
+    // ==================== Image Generation Operation Constants 
====================
+
+    @Metadata(label = "producer generateImage",
+              description = "The number of images to generate", javaType = 
"Integer")
+    public static final String IMAGE_NUMBER_OF_IMAGES = 
"CamelGoogleVertexAIImageNumberOfImages";
+
+    @Metadata(label = "producer generateImage",
+              description = "The aspect ratio for generated images (e.g., 1:1, 
16:9, 9:16, 3:4, 4:3)", javaType = "String")
+    public static final String IMAGE_ASPECT_RATIO = 
"CamelGoogleVertexAIImageAspectRatio";
+
+    @Metadata(label = "generateImage",
+              description = "The generated images from an image generation 
operation",

Review Comment:
   _Claude Code on behalf of Guillaume Nodet_
   
   Inconsistent `@Metadata` label: this uses `"generateImage"` while all other 
new constants use `"producer <operationName>"` (e.g., `"producer 
generateImage"`, `"producer generateEmbeddings"`). Should be `"producer 
generateImage"` for consistency.
   
   This also shows up in the generated JSON where the group/label is 
`"generateImage"` without the `"producer"` prefix.



##########
components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducer.java:
##########
@@ -130,18 +143,127 @@ private void generateChat(Exchange exchange) throws 
Exception {
     }
 
     private void generateChatStreaming(Exchange exchange) throws Exception {
-        // TODO: Implement streaming support using 
client.models.generateContentStream
-        throw new UnsupportedOperationException("Streaming is not yet 
implemented");
+        String prompt = getPrompt(exchange);
+        GenerateContentConfig config = buildConfig(exchange);
+
+        Client client = endpoint.getClient();
+        String modelId = endpoint.getConfiguration().getModelId();
+        String streamOutputMode = 
endpoint.getConfiguration().getStreamOutputMode();
+
+        LOG.debug("Generating streaming content with model: {} and prompt: 
{}", modelId, prompt);
+
+        try (ResponseStream<GenerateContentResponse> stream
+                = client.models.generateContentStream(modelId, prompt, 
config)) {
+
+            StringBuilder fullText = new StringBuilder();
+            int chunkCount = 0;
+            GenerateContentResponse lastResponse = null;
+
+            for (GenerateContentResponse chunk : stream) {
+                chunkCount++;
+                lastResponse = chunk;
+                String chunkText = chunk.text();
+                if (chunkText != null) {
+                    fullText.append(chunkText);
+                }
+            }
+
+            Message message = getMessageForResponse(exchange);
+
+            if ("chunks".equals(streamOutputMode)) {
+                // In chunks mode, return the full accumulated text but 
include chunk count

Review Comment:
   _Claude Code on behalf of Guillaume Nodet_
   
   Both branches of this `if/else` do exactly the same thing: 
`message.setBody(fullText.toString())`. The comment says "In chunks mode, 
return the full accumulated text but include chunk count" but the default mode 
also returns the same thing.
   
   Either:
   - Implement differentiated behavior (e.g., chunks mode returns 
`List<String>` of individual chunks, or uses Camel streaming/reactive 
patterns), or
   - Remove the `streamOutputMode` check entirely since it's dead code.
   
   As-is, this is misleading — users will set `streamOutputMode=chunks` 
expecting different behavior but get the exact same result.



##########
components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducer.java:
##########
@@ -150,8 +272,59 @@ private void generateCode(Exchange exchange) throws 
Exception {
     }
 
     private void generateMultimodal(Exchange exchange) throws Exception {
-        // TODO: Implement multimodal generation with images/video input
-        throw new UnsupportedOperationException("Multimodal generation is not 
yet implemented");
+        GenerateContentConfig config = buildConfig(exchange);
+
+        Client client = endpoint.getClient();
+        String modelId = endpoint.getConfiguration().getModelId();
+
+        LOG.debug("Generating multimodal content with model: {}", modelId);
+
+        List<Part> parts = new ArrayList<>();
+
+        // Add text prompt as a part
+        String prompt = 
exchange.getIn().getHeader(GoogleVertexAIConstants.PROMPT, String.class);
+        if (prompt != null) {
+            parts.add(Part.fromText(prompt));
+        }
+
+        // Add inline media data if provided
+        byte[] mediaData = 
exchange.getIn().getHeader(GoogleVertexAIConstants.MEDIA_DATA, byte[].class);
+        String mediaMimeType = 
exchange.getIn().getHeader(GoogleVertexAIConstants.MEDIA_MIME_TYPE, 
String.class);
+        if (mediaData != null && mediaMimeType != null) {
+            parts.add(Part.fromBytes(mediaData, mediaMimeType));
+        }
+
+        // Add GCS URI media if provided
+        String gcsUri = 
exchange.getIn().getHeader(GoogleVertexAIConstants.MEDIA_GCS_URI, String.class);
+        if (gcsUri != null) {
+            String mimeType = mediaMimeType != null ? mediaMimeType : 
"application/octet-stream";
+            parts.add(Part.fromUri(gcsUri, mimeType));
+        }
+
+        // If body is byte[] and no media header was set, treat body as media
+        if (parts.isEmpty() || (prompt != null && mediaData == null && gcsUri 
== null)) {
+            Object body = exchange.getIn().getBody();

Review Comment:
   _Claude Code on behalf of Guillaume Nodet_
   
   This condition is hard to follow:
   ```java
   if (parts.isEmpty() || (prompt != null && mediaData == null && gcsUri == 
null))
   ```
   
   The second clause means "prompt was given but no media headers were set" — 
so we check if body can serve as media. But if `parts.isEmpty()` is true, it 
means no prompt AND no media headers, so we also check body as a fallback.
   
   This would be clearer split into explicit cases:
   ```java
   // If no media was provided via headers, check if body contains media
   if (mediaData == null && gcsUri == null) {
       Object body = exchange.getIn().getBody();
       if (body instanceof byte[] bodyBytes && mediaMimeType != null) {
           parts.add(Part.fromBytes(bodyBytes, mediaMimeType));
       } else if (body instanceof String bodyStr && prompt == null) {
           parts.add(Part.fromText(bodyStr));
       }
   }
   ```



##########
components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducer.java:
##########
@@ -344,6 +583,36 @@ private String extractAnthropicTextContent(String 
responseJson) {
         return responseJson;
     }
 
+    /**
+     * Extracts text content from Anthropic SSE streaming response. Each SSE 
event is on a separate line prefixed with
+     * "data: ". We look for content_block_delta events containing text deltas.
+     */
+    @SuppressWarnings("unchecked")
+    private String extractAnthropicStreamingTextContent(String sseResponse) {
+        StringBuilder text = new StringBuilder();
+        for (String line : sseResponse.split("\n")) {
+            String trimmed = line.trim();
+            if (trimmed.startsWith("data: ")) {
+                String jsonStr = trimmed.substring(6);
+                try {
+                    Map<String, Object> event = 
OBJECT_MAPPER.readValue(jsonStr, Map.class);
+                    if ("content_block_delta".equals(event.get("type"))) {
+                        Map<String, Object> delta = (Map<String, Object>) 
event.get("delta");
+                        if (delta != null && 
"text_delta".equals(delta.get("type"))) {
+                            Object textValue = delta.get("text");
+                            if (textValue != null) {
+                                text.append(textValue);
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    // Skip non-JSON or unparseable lines
+                }

Review Comment:
   _Claude Code on behalf of Guillaume Nodet_
   
   Silently swallowing all exceptions with an empty catch block. If a line 
starts with `data: ` but contains malformed JSON, this will be silently ignored 
which could lead to truncated text output with no indication of the problem.
   
   At minimum, add `LOG.trace("Skipping unparseable SSE line: {}", jsonStr, 
e);` so there's some visibility when debugging.



##########
components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducer.java:
##########
@@ -222,8 +395,74 @@ private void rawPredict(Exchange exchange) throws 
Exception {
      * Sends a streaming raw prediction request to partner models.
      */
     private void streamRawPredict(Exchange exchange) throws Exception {
-        // TODO: Implement streaming rawPredict using streamRawPredict API
-        throw new UnsupportedOperationException("Streaming rawPredict is not 
yet implemented");
+        GoogleVertexAIConfiguration config = endpoint.getConfiguration();
+        PredictionServiceClient predictionClient = 
endpoint.getPredictionServiceClient();
+
+        String publisher = config.getPublisher();
+        if (ObjectHelper.isEmpty(publisher)) {
+            throw new IllegalArgumentException("Publisher must be specified 
for streamRawPredict operation");
+        }
+
+        String location = config.getLocation();
+        if ("global".equalsIgnoreCase(location)) {
+            location = "us-east5";
+        }

Review Comment:
   _Claude Code on behalf of Guillaume Nodet_
   
   Hardcoding `"us-east5"` as a fallback for `"global"` location is fragile. 
This behavior should be documented (either in the code with a comment 
explaining why, or in the component docs). Is there a reason `us-central1` 
isn't used as the default like elsewhere? Partner models may have different 
regional availability, but the choice should be transparent to the user.



##########
components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducer.java:
##########
@@ -222,8 +395,74 @@ private void rawPredict(Exchange exchange) throws 
Exception {
      * Sends a streaming raw prediction request to partner models.
      */
     private void streamRawPredict(Exchange exchange) throws Exception {
-        // TODO: Implement streaming rawPredict using streamRawPredict API
-        throw new UnsupportedOperationException("Streaming rawPredict is not 
yet implemented");
+        GoogleVertexAIConfiguration config = endpoint.getConfiguration();
+        PredictionServiceClient predictionClient = 
endpoint.getPredictionServiceClient();
+
+        String publisher = config.getPublisher();
+        if (ObjectHelper.isEmpty(publisher)) {
+            throw new IllegalArgumentException("Publisher must be specified 
for streamRawPredict operation");
+        }
+
+        String location = config.getLocation();
+        if ("global".equalsIgnoreCase(location)) {
+            location = "us-east5";
+        }
+
+        String endpointName = 
PredictionServiceClientFactory.buildPublisherModelEndpoint(
+                config.getProjectId(),
+                location,
+                publisher,
+                config.getModelId());
+
+        LOG.debug("Sending streamRawPredict request to endpoint: {}", 
endpointName);
+
+        String requestJson = buildRawPredictRequestBody(exchange, config);
+
+        // Add stream: true for Anthropic
+        if ("anthropic".equals(publisher) && 
!requestJson.contains("\"stream\"")) {
+            Map<String, Object> jsonMap = OBJECT_MAPPER.readValue(requestJson, 
Map.class);
+            jsonMap.put("stream", true);
+            requestJson = OBJECT_MAPPER.writeValueAsString(jsonMap);
+        }
+
+        LOG.debug("Request body: {}", requestJson);
+
+        HttpBody httpBody = HttpBody.newBuilder()
+                .setData(ByteString.copyFromUtf8(requestJson))
+                .setContentType("application/json")
+                .build();
+
+        StreamRawPredictRequest request = StreamRawPredictRequest.newBuilder()
+                .setEndpoint(endpointName)
+                .setHttpBody(httpBody)
+                .build();
+
+        ServerStream<HttpBody> stream = 
predictionClient.streamRawPredictCallable().call(request);
+

Review Comment:
   _Claude Code on behalf of Guillaume Nodet_
   
   Unlike `generateChatStreaming()` which properly uses try-with-resources on 
`ResponseStream`, the `ServerStream<HttpBody>` here is never closed. If the 
iteration is interrupted (e.g., by an exception during `append`), the 
underlying gRPC stream won't be cleaned up.
   
   Consider wrapping in a try block or calling `stream.cancel()` in a finally 
block.



##########
components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducer.java:
##########
@@ -130,18 +143,127 @@ private void generateChat(Exchange exchange) throws 
Exception {
     }
 
     private void generateChatStreaming(Exchange exchange) throws Exception {
-        // TODO: Implement streaming support using 
client.models.generateContentStream
-        throw new UnsupportedOperationException("Streaming is not yet 
implemented");
+        String prompt = getPrompt(exchange);
+        GenerateContentConfig config = buildConfig(exchange);
+
+        Client client = endpoint.getClient();
+        String modelId = endpoint.getConfiguration().getModelId();
+        String streamOutputMode = 
endpoint.getConfiguration().getStreamOutputMode();
+
+        LOG.debug("Generating streaming content with model: {} and prompt: 
{}", modelId, prompt);
+
+        try (ResponseStream<GenerateContentResponse> stream
+                = client.models.generateContentStream(modelId, prompt, 
config)) {
+
+            StringBuilder fullText = new StringBuilder();
+            int chunkCount = 0;
+            GenerateContentResponse lastResponse = null;
+
+            for (GenerateContentResponse chunk : stream) {
+                chunkCount++;
+                lastResponse = chunk;
+                String chunkText = chunk.text();
+                if (chunkText != null) {
+                    fullText.append(chunkText);
+                }
+            }
+
+            Message message = getMessageForResponse(exchange);
+
+            if ("chunks".equals(streamOutputMode)) {
+                // In chunks mode, return the full accumulated text but 
include chunk count
+                message.setBody(fullText.toString());
+            } else {
+                // In complete mode (default), return the full accumulated text
+                message.setBody(fullText.toString());
+            }
+
+            message.setHeader(GoogleVertexAIConstants.CHUNK_COUNT, chunkCount);
+
+            if (lastResponse != null) {
+                setMetadataHeaders(exchange, lastResponse);
+            }
+        }
     }
 
     private void generateImage(Exchange exchange) throws Exception {
-        // TODO: Implement image generation using Imagen models
-        throw new UnsupportedOperationException("Image generation is not yet 
implemented");
+        String prompt = getPrompt(exchange);
+
+        Client client = endpoint.getClient();
+        String modelId = endpoint.getConfiguration().getModelId();
+
+        LOG.debug("Generating image with model: {} and prompt: {}", modelId, 
prompt);
+
+        GenerateImagesConfig.Builder configBuilder = 
GenerateImagesConfig.builder();
+
+        Integer numberOfImages = 
exchange.getIn().getHeader(GoogleVertexAIConstants.IMAGE_NUMBER_OF_IMAGES, 
Integer.class);
+        if (numberOfImages != null) {
+            configBuilder.numberOfImages(numberOfImages);
+        }
+
+        String aspectRatio = 
exchange.getIn().getHeader(GoogleVertexAIConstants.IMAGE_ASPECT_RATIO, 
String.class);
+        if (aspectRatio != null) {
+            configBuilder.aspectRatio(aspectRatio);
+        }
+
+        GenerateImagesResponse response = 
client.models.generateImages(modelId, prompt, configBuilder.build());
+
+        Message message = getMessageForResponse(exchange);
+
+        List<Image> images = response.images();
+        message.setBody(images);
+        message.setHeader(GoogleVertexAIConstants.GENERATED_IMAGES, images);
+        message.setHeader(GoogleVertexAIConstants.MODEL_ID, modelId);
     }
 
+    @SuppressWarnings("unchecked")
     private void generateEmbeddings(Exchange exchange) throws Exception {
-        // TODO: Implement embeddings generation
-        throw new UnsupportedOperationException("Embeddings generation is not 
yet implemented");
+        Client client = endpoint.getClient();
+        String modelId = endpoint.getConfiguration().getModelId();
+
+        LOG.debug("Generating embeddings with model: {}", modelId);
+
+        EmbedContentConfig.Builder configBuilder = 
EmbedContentConfig.builder();
+
+        String taskType = 
exchange.getIn().getHeader(GoogleVertexAIConstants.EMBEDDING_TASK_TYPE, 
String.class);
+        if (taskType != null) {
+            configBuilder.taskType(taskType);
+        }
+
+        Integer outputDimensionality
+                = 
exchange.getIn().getHeader(GoogleVertexAIConstants.EMBEDDING_OUTPUT_DIMENSIONALITY,
 Integer.class);
+        if (outputDimensionality != null) {
+            configBuilder.outputDimensionality(outputDimensionality);
+        }
+
+        EmbedContentConfig embedConfig = configBuilder.build();
+        Object body = exchange.getIn().getBody();
+        EmbedContentResponse response;
+
+        if (body instanceof List) {
+            // Batch embeddings
+            List<String> texts = (List<String>) body;
+            response = client.models.embedContent(modelId, texts, embedConfig);

Review Comment:
   _Claude Code on behalf of Guillaume Nodet_
   
   Unchecked cast `(List<String>) body` — if the body is a `List` of non-String 
objects (e.g., `List<Integer>`), this will compile fine but throw a confusing 
`ClassCastException` later when the SDK tries to use the elements. Consider:
   ```java
   List<?> list = (List<?>) body;
   List<String> texts = list.stream()
       .map(Object::toString)
       .collect(Collectors.toList());
   ```
   or at least validate the element types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to