gnodet commented on code in PR #22220:
URL: https://github.com/apache/camel/pull/22220#discussion_r2980251430
##########
components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIConstants.java:
##########
@@ -90,6 +90,54 @@ public final class GoogleVertexAIConstants {
@Metadata(description = "The number of chunks received in streaming
response", javaType = "Integer")
public static final String STREAMING_CHUNK_COUNT =
"CamelGoogleVertexAIChunkCount";
+ // ==================== Streaming Operation Constants ====================
+
+ @Metadata(label = "producer generateChatStreaming",
+ description = "The number of streaming chunks received",
javaType = "Integer")
+ public static final String CHUNK_COUNT = "CamelGoogleVertexAIChunkCount";
+
Review Comment:
_Claude Code on behalf of Guillaume Nodet_
This new `CHUNK_COUNT` constant has the same header value
(`CamelGoogleVertexAIChunkCount`) as the existing `STREAMING_CHUNK_COUNT` on
line 91 (visible just above in this diff). This creates a duplicate constant
for the same header name, which causes issues in the generated metadata (index
20 is skipped in the generated JSON).
Since `STREAMING_CHUNK_COUNT` was introduced in the same development cycle
and hasn't been released yet, I'd suggest removing `STREAMING_CHUNK_COUNT`
entirely and keeping only `CHUNK_COUNT`. If it was already released, deprecate
`STREAMING_CHUNK_COUNT` and point to `CHUNK_COUNT`.
##########
components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIConstants.java:
##########
@@ -90,6 +90,54 @@ public final class GoogleVertexAIConstants {
@Metadata(description = "The number of chunks received in streaming
response", javaType = "Integer")
public static final String STREAMING_CHUNK_COUNT =
"CamelGoogleVertexAIChunkCount";
+ // ==================== Streaming Operation Constants ====================
+
+ @Metadata(label = "producer generateChatStreaming",
+ description = "The number of streaming chunks received",
javaType = "Integer")
+ public static final String CHUNK_COUNT = "CamelGoogleVertexAIChunkCount";
+
+ // ==================== Image Generation Operation Constants
====================
+
+ @Metadata(label = "producer generateImage",
+ description = "The number of images to generate", javaType =
"Integer")
+ public static final String IMAGE_NUMBER_OF_IMAGES =
"CamelGoogleVertexAIImageNumberOfImages";
+
+ @Metadata(label = "producer generateImage",
+ description = "The aspect ratio for generated images (e.g., 1:1,
16:9, 9:16, 3:4, 4:3)", javaType = "String")
+ public static final String IMAGE_ASPECT_RATIO =
"CamelGoogleVertexAIImageAspectRatio";
+
+ @Metadata(label = "generateImage",
+ description = "The generated images from an image generation
operation",
Review Comment:
_Claude Code on behalf of Guillaume Nodet_
Inconsistent `@Metadata` label: this uses `"generateImage"` while all other
new constants use `"producer <operationName>"` (e.g., `"producer
generateImage"`, `"producer generateEmbeddings"`). Should be `"producer
generateImage"` for consistency.
This also shows up in the generated JSON where the group/label is
`"generateImage"` without the `"producer"` prefix.
##########
components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducer.java:
##########
@@ -130,18 +143,127 @@ private void generateChat(Exchange exchange) throws
Exception {
}
private void generateChatStreaming(Exchange exchange) throws Exception {
- // TODO: Implement streaming support using
client.models.generateContentStream
- throw new UnsupportedOperationException("Streaming is not yet
implemented");
+ String prompt = getPrompt(exchange);
+ GenerateContentConfig config = buildConfig(exchange);
+
+ Client client = endpoint.getClient();
+ String modelId = endpoint.getConfiguration().getModelId();
+ String streamOutputMode =
endpoint.getConfiguration().getStreamOutputMode();
+
+ LOG.debug("Generating streaming content with model: {} and prompt:
{}", modelId, prompt);
+
+ try (ResponseStream<GenerateContentResponse> stream
+ = client.models.generateContentStream(modelId, prompt,
config)) {
+
+ StringBuilder fullText = new StringBuilder();
+ int chunkCount = 0;
+ GenerateContentResponse lastResponse = null;
+
+ for (GenerateContentResponse chunk : stream) {
+ chunkCount++;
+ lastResponse = chunk;
+ String chunkText = chunk.text();
+ if (chunkText != null) {
+ fullText.append(chunkText);
+ }
+ }
+
+ Message message = getMessageForResponse(exchange);
+
+ if ("chunks".equals(streamOutputMode)) {
+ // In chunks mode, return the full accumulated text but
include chunk count
Review Comment:
_Claude Code on behalf of Guillaume Nodet_
Both branches of this `if/else` do exactly the same thing:
`message.setBody(fullText.toString())`. The comment says "In chunks mode,
return the full accumulated text but include chunk count" but the default mode
also returns the same thing.
Either:
- Implement differentiated behavior (e.g., chunks mode returns
`List<String>` of individual chunks, or uses Camel streaming/reactive
patterns), or
- Remove the `streamOutputMode` check entirely since it's dead code.
As-is, this is misleading — users will set `streamOutputMode=chunks`
expecting different behavior but get the exact same result.
##########
components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducer.java:
##########
@@ -150,8 +272,59 @@ private void generateCode(Exchange exchange) throws
Exception {
}
private void generateMultimodal(Exchange exchange) throws Exception {
- // TODO: Implement multimodal generation with images/video input
- throw new UnsupportedOperationException("Multimodal generation is not
yet implemented");
+ GenerateContentConfig config = buildConfig(exchange);
+
+ Client client = endpoint.getClient();
+ String modelId = endpoint.getConfiguration().getModelId();
+
+ LOG.debug("Generating multimodal content with model: {}", modelId);
+
+ List<Part> parts = new ArrayList<>();
+
+ // Add text prompt as a part
+ String prompt =
exchange.getIn().getHeader(GoogleVertexAIConstants.PROMPT, String.class);
+ if (prompt != null) {
+ parts.add(Part.fromText(prompt));
+ }
+
+ // Add inline media data if provided
+ byte[] mediaData =
exchange.getIn().getHeader(GoogleVertexAIConstants.MEDIA_DATA, byte[].class);
+ String mediaMimeType =
exchange.getIn().getHeader(GoogleVertexAIConstants.MEDIA_MIME_TYPE,
String.class);
+ if (mediaData != null && mediaMimeType != null) {
+ parts.add(Part.fromBytes(mediaData, mediaMimeType));
+ }
+
+ // Add GCS URI media if provided
+ String gcsUri =
exchange.getIn().getHeader(GoogleVertexAIConstants.MEDIA_GCS_URI, String.class);
+ if (gcsUri != null) {
+ String mimeType = mediaMimeType != null ? mediaMimeType :
"application/octet-stream";
+ parts.add(Part.fromUri(gcsUri, mimeType));
+ }
+
+ // If body is byte[] and no media header was set, treat body as media
+ if (parts.isEmpty() || (prompt != null && mediaData == null && gcsUri
== null)) {
+ Object body = exchange.getIn().getBody();
Review Comment:
_Claude Code on behalf of Guillaume Nodet_
This condition is hard to follow:
```java
if (parts.isEmpty() || (prompt != null && mediaData == null && gcsUri ==
null))
```
The second clause means "prompt was given but no media headers were set" —
so we check if body can serve as media. But if `parts.isEmpty()` is true, it
means no prompt AND no media headers, so we also check body as a fallback.
This would be clearer split into explicit cases:
```java
// If no media was provided via headers, check if body contains media
if (mediaData == null && gcsUri == null) {
Object body = exchange.getIn().getBody();
if (body instanceof byte[] bodyBytes && mediaMimeType != null) {
parts.add(Part.fromBytes(bodyBytes, mediaMimeType));
} else if (body instanceof String bodyStr && prompt == null) {
parts.add(Part.fromText(bodyStr));
}
}
```
##########
components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducer.java:
##########
@@ -344,6 +583,36 @@ private String extractAnthropicTextContent(String
responseJson) {
return responseJson;
}
+ /**
+ * Extracts text content from Anthropic SSE streaming response. Each SSE
event is on a separate line prefixed with
+ * "data: ". We look for content_block_delta events containing text deltas.
+ */
+ @SuppressWarnings("unchecked")
+ private String extractAnthropicStreamingTextContent(String sseResponse) {
+ StringBuilder text = new StringBuilder();
+ for (String line : sseResponse.split("\n")) {
+ String trimmed = line.trim();
+ if (trimmed.startsWith("data: ")) {
+ String jsonStr = trimmed.substring(6);
+ try {
+ Map<String, Object> event =
OBJECT_MAPPER.readValue(jsonStr, Map.class);
+ if ("content_block_delta".equals(event.get("type"))) {
+ Map<String, Object> delta = (Map<String, Object>)
event.get("delta");
+ if (delta != null &&
"text_delta".equals(delta.get("type"))) {
+ Object textValue = delta.get("text");
+ if (textValue != null) {
+ text.append(textValue);
+ }
+ }
+ }
+ } catch (Exception e) {
+ // Skip non-JSON or unparseable lines
+ }
Review Comment:
_Claude Code on behalf of Guillaume Nodet_
Silently swallowing all exceptions with an empty catch block. If a line
starts with `data: ` but contains malformed JSON, this will be silently ignored
which could lead to truncated text output with no indication of the problem.
At minimum, add `LOG.trace("Skipping unparseable SSE line: {}", jsonStr,
e);` so there's some visibility when debugging.
##########
components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducer.java:
##########
@@ -222,8 +395,74 @@ private void rawPredict(Exchange exchange) throws
Exception {
* Sends a streaming raw prediction request to partner models.
*/
private void streamRawPredict(Exchange exchange) throws Exception {
- // TODO: Implement streaming rawPredict using streamRawPredict API
- throw new UnsupportedOperationException("Streaming rawPredict is not
yet implemented");
+ GoogleVertexAIConfiguration config = endpoint.getConfiguration();
+ PredictionServiceClient predictionClient =
endpoint.getPredictionServiceClient();
+
+ String publisher = config.getPublisher();
+ if (ObjectHelper.isEmpty(publisher)) {
+ throw new IllegalArgumentException("Publisher must be specified
for streamRawPredict operation");
+ }
+
+ String location = config.getLocation();
+ if ("global".equalsIgnoreCase(location)) {
+ location = "us-east5";
+ }
Review Comment:
_Claude Code on behalf of Guillaume Nodet_
Hardcoding `"us-east5"` as a fallback for `"global"` location is fragile.
This behavior should be documented (either in the code with a comment
explaining why, or in the component docs). Is there a reason `us-central1`
isn't used as the default like elsewhere? Partner models may have different
regional availability, but the choice should be transparent to the user.
##########
components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducer.java:
##########
@@ -222,8 +395,74 @@ private void rawPredict(Exchange exchange) throws
Exception {
* Sends a streaming raw prediction request to partner models.
*/
private void streamRawPredict(Exchange exchange) throws Exception {
- // TODO: Implement streaming rawPredict using streamRawPredict API
- throw new UnsupportedOperationException("Streaming rawPredict is not
yet implemented");
+ GoogleVertexAIConfiguration config = endpoint.getConfiguration();
+ PredictionServiceClient predictionClient =
endpoint.getPredictionServiceClient();
+
+ String publisher = config.getPublisher();
+ if (ObjectHelper.isEmpty(publisher)) {
+ throw new IllegalArgumentException("Publisher must be specified
for streamRawPredict operation");
+ }
+
+ String location = config.getLocation();
+ if ("global".equalsIgnoreCase(location)) {
+ location = "us-east5";
+ }
+
+ String endpointName =
PredictionServiceClientFactory.buildPublisherModelEndpoint(
+ config.getProjectId(),
+ location,
+ publisher,
+ config.getModelId());
+
+ LOG.debug("Sending streamRawPredict request to endpoint: {}",
endpointName);
+
+ String requestJson = buildRawPredictRequestBody(exchange, config);
+
+ // Add stream: true for Anthropic
+ if ("anthropic".equals(publisher) &&
!requestJson.contains("\"stream\"")) {
+ Map<String, Object> jsonMap = OBJECT_MAPPER.readValue(requestJson,
Map.class);
+ jsonMap.put("stream", true);
+ requestJson = OBJECT_MAPPER.writeValueAsString(jsonMap);
+ }
+
+ LOG.debug("Request body: {}", requestJson);
+
+ HttpBody httpBody = HttpBody.newBuilder()
+ .setData(ByteString.copyFromUtf8(requestJson))
+ .setContentType("application/json")
+ .build();
+
+ StreamRawPredictRequest request = StreamRawPredictRequest.newBuilder()
+ .setEndpoint(endpointName)
+ .setHttpBody(httpBody)
+ .build();
+
+ ServerStream<HttpBody> stream =
predictionClient.streamRawPredictCallable().call(request);
+
Review Comment:
_Claude Code on behalf of Guillaume Nodet_
Unlike `generateChatStreaming()` which properly uses try-with-resources on
`ResponseStream`, the `ServerStream<HttpBody>` here is never closed. If the
iteration is interrupted (e.g., by an exception during `append`), the
underlying gRPC stream won't be cleaned up.
Consider wrapping in a try block or calling `stream.cancel()` in a finally
block.
##########
components/camel-google/camel-google-vertexai/src/main/java/org/apache/camel/component/google/vertexai/GoogleVertexAIProducer.java:
##########
@@ -130,18 +143,127 @@ private void generateChat(Exchange exchange) throws
Exception {
}
private void generateChatStreaming(Exchange exchange) throws Exception {
- // TODO: Implement streaming support using
client.models.generateContentStream
- throw new UnsupportedOperationException("Streaming is not yet
implemented");
+ String prompt = getPrompt(exchange);
+ GenerateContentConfig config = buildConfig(exchange);
+
+ Client client = endpoint.getClient();
+ String modelId = endpoint.getConfiguration().getModelId();
+ String streamOutputMode =
endpoint.getConfiguration().getStreamOutputMode();
+
+ LOG.debug("Generating streaming content with model: {} and prompt:
{}", modelId, prompt);
+
+ try (ResponseStream<GenerateContentResponse> stream
+ = client.models.generateContentStream(modelId, prompt,
config)) {
+
+ StringBuilder fullText = new StringBuilder();
+ int chunkCount = 0;
+ GenerateContentResponse lastResponse = null;
+
+ for (GenerateContentResponse chunk : stream) {
+ chunkCount++;
+ lastResponse = chunk;
+ String chunkText = chunk.text();
+ if (chunkText != null) {
+ fullText.append(chunkText);
+ }
+ }
+
+ Message message = getMessageForResponse(exchange);
+
+ if ("chunks".equals(streamOutputMode)) {
+ // In chunks mode, return the full accumulated text but
include chunk count
+ message.setBody(fullText.toString());
+ } else {
+ // In complete mode (default), return the full accumulated text
+ message.setBody(fullText.toString());
+ }
+
+ message.setHeader(GoogleVertexAIConstants.CHUNK_COUNT, chunkCount);
+
+ if (lastResponse != null) {
+ setMetadataHeaders(exchange, lastResponse);
+ }
+ }
}
private void generateImage(Exchange exchange) throws Exception {
- // TODO: Implement image generation using Imagen models
- throw new UnsupportedOperationException("Image generation is not yet
implemented");
+ String prompt = getPrompt(exchange);
+
+ Client client = endpoint.getClient();
+ String modelId = endpoint.getConfiguration().getModelId();
+
+ LOG.debug("Generating image with model: {} and prompt: {}", modelId,
prompt);
+
+ GenerateImagesConfig.Builder configBuilder =
GenerateImagesConfig.builder();
+
+ Integer numberOfImages =
exchange.getIn().getHeader(GoogleVertexAIConstants.IMAGE_NUMBER_OF_IMAGES,
Integer.class);
+ if (numberOfImages != null) {
+ configBuilder.numberOfImages(numberOfImages);
+ }
+
+ String aspectRatio =
exchange.getIn().getHeader(GoogleVertexAIConstants.IMAGE_ASPECT_RATIO,
String.class);
+ if (aspectRatio != null) {
+ configBuilder.aspectRatio(aspectRatio);
+ }
+
+ GenerateImagesResponse response =
client.models.generateImages(modelId, prompt, configBuilder.build());
+
+ Message message = getMessageForResponse(exchange);
+
+ List<Image> images = response.images();
+ message.setBody(images);
+ message.setHeader(GoogleVertexAIConstants.GENERATED_IMAGES, images);
+ message.setHeader(GoogleVertexAIConstants.MODEL_ID, modelId);
}
+ @SuppressWarnings("unchecked")
private void generateEmbeddings(Exchange exchange) throws Exception {
- // TODO: Implement embeddings generation
- throw new UnsupportedOperationException("Embeddings generation is not
yet implemented");
+ Client client = endpoint.getClient();
+ String modelId = endpoint.getConfiguration().getModelId();
+
+ LOG.debug("Generating embeddings with model: {}", modelId);
+
+ EmbedContentConfig.Builder configBuilder =
EmbedContentConfig.builder();
+
+ String taskType =
exchange.getIn().getHeader(GoogleVertexAIConstants.EMBEDDING_TASK_TYPE,
String.class);
+ if (taskType != null) {
+ configBuilder.taskType(taskType);
+ }
+
+ Integer outputDimensionality
+ =
exchange.getIn().getHeader(GoogleVertexAIConstants.EMBEDDING_OUTPUT_DIMENSIONALITY,
Integer.class);
+ if (outputDimensionality != null) {
+ configBuilder.outputDimensionality(outputDimensionality);
+ }
+
+ EmbedContentConfig embedConfig = configBuilder.build();
+ Object body = exchange.getIn().getBody();
+ EmbedContentResponse response;
+
+ if (body instanceof List) {
+ // Batch embeddings
+ List<String> texts = (List<String>) body;
+ response = client.models.embedContent(modelId, texts, embedConfig);
Review Comment:
_Claude Code on behalf of Guillaume Nodet_
Unchecked cast `(List<String>) body` — if the body is a `List` of non-String
objects (e.g., `List<Integer>`), this will compile fine but throw a confusing
`ClassCastException` later when the SDK tries to use the elements. Consider:
```java
List<?> list = (List<?>) body;
List<String> texts = list.stream()
.map(Object::toString)
.collect(Collectors.toList());
```
or at least validate the element types.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]