This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 632c92f50d36 CAMEL-23105 - camel-docling - Fix broken async conversion
workflow (discarded result and error masking) (#21675)
632c92f50d36 is described below
commit 632c92f50d36e44329593e03685b38d27ce3c1ba
Author: Andrea Cosentino <[email protected]>
AuthorDate: Mon Mar 2 13:07:34 2026 +0100
CAMEL-23105 - camel-docling - Fix broken async conversion workflow
(discarded result and error masking) (#21675)
The CompletionStage returned by convertSourceAsync() was discarded and
a fabricated task ID with no server-side correlation was returned. The
subsequent CHECK_CONVERSION_STATUS would fail silently because
checkConversionStatusInternal() returned COMPLETED on any exception.
Store the CompletableFuture in a ConcurrentHashMap keyed by a generated
task ID. CHECK_CONVERSION_STATUS now checks the local map first and
returns the actual future state (IN_PROGRESS, COMPLETED with result,
or FAILED with error message). Also fix the error path to return FAILED
instead of COMPLETED when an exception occurs.
Signed-off-by: Andrea Cosentino <[email protected]>
---
.../camel/component/docling/DoclingProducer.java | 55 +++++-
.../docling/DoclingAsyncConversionTest.java | 200 +++++++++++++++++++++
2 files changed, 250 insertions(+), 5 deletions(-)
diff --git
a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
index a1a04f6f70e9..8be170af8d10 100644
---
a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
+++
b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
@@ -32,12 +32,14 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -87,6 +89,8 @@ public class DoclingProducer extends DefaultProducer {
private DoclingConfiguration configuration;
private DoclingServeApi doclingServeApi;
private ObjectMapper objectMapper;
+ private final Map<String, CompletableFuture<ConvertDocumentResponse>>
pendingAsyncTasks = new ConcurrentHashMap<>();
+ private final AtomicLong taskIdCounter = new AtomicLong();
public DoclingProducer(DoclingEndpoint endpoint) {
super(endpoint);
@@ -130,6 +134,9 @@ public class DoclingProducer extends DefaultProducer {
@Override
protected void doStop() throws Exception {
super.doStop();
+ // Cancel any pending async tasks
+ pendingAsyncTasks.forEach((id, future) -> future.cancel(true));
+ pendingAsyncTasks.clear();
if (doclingServeApi != null) {
doclingServeApi = null;
LOG.info("DoclingServeApi reference cleared");
@@ -287,10 +294,12 @@ public class DoclingProducer extends DefaultProducer {
// Start async conversion
ConvertDocumentRequest request = buildConvertRequest(inputPath,
outputFormat);
- CompletionStage<ConvertDocumentResponse> asyncResult =
doclingServeApi.convertSourceAsync(request);
+ CompletableFuture<ConvertDocumentResponse> asyncResult
+ =
doclingServeApi.convertSourceAsync(request).toCompletableFuture();
- // Generate a task ID for tracking
- String taskId = "task-" + System.currentTimeMillis() + "-" +
inputPath.hashCode();
+ // Generate a unique task ID and store the future for later status
checks
+ String taskId = "task-" + taskIdCounter.incrementAndGet();
+ pendingAsyncTasks.put(taskId, asyncResult);
LOG.debug("Started async conversion with task ID: {}", taskId);
// Set task ID in body and header
@@ -345,6 +354,13 @@ public class DoclingProducer extends DefaultProducer {
private ConversionStatus checkConversionStatusInternal(String taskId) {
LOG.debug("Checking status for task: {}", taskId);
+ // Check the local pending tasks map first (tasks submitted via
SUBMIT_ASYNC_CONVERSION)
+ CompletableFuture<ConvertDocumentResponse> future =
pendingAsyncTasks.get(taskId);
+ if (future != null) {
+ return checkLocalAsyncTask(taskId, future);
+ }
+
+ // Fall back to server-side task polling
try {
TaskStatusPollRequest pollRequest = TaskStatusPollRequest.builder()
.taskId(taskId)
@@ -374,8 +390,37 @@ public class DoclingProducer extends DefaultProducer {
return new ConversionStatus(taskId, status);
} catch (Exception e) {
LOG.warn("Failed to check task status for {}: {}", taskId,
e.getMessage());
- // If the task ID doesn't exist on the server, return a completed
status as a fallback
- return new ConversionStatus(taskId,
ConversionStatus.Status.COMPLETED);
+ String errorMsg = e.getMessage() != null ? e.getMessage() :
e.getClass().getName();
+ return new ConversionStatus(taskId,
ConversionStatus.Status.FAILED, null, errorMsg, null);
+ }
+ }
+
+ private ConversionStatus checkLocalAsyncTask(String taskId,
CompletableFuture<ConvertDocumentResponse> future) {
+ if (!future.isDone()) {
+ return new ConversionStatus(taskId,
ConversionStatus.Status.IN_PROGRESS);
+ }
+
+ if (future.isCompletedExceptionally() || future.isCancelled()) {
+ // Remove completed task from map
+ pendingAsyncTasks.remove(taskId);
+ String errorMessage;
+ try {
+ future.join();
+ errorMessage = "Unknown error";
+ } catch (Exception e) {
+ errorMessage = e.getCause() != null ?
e.getCause().getMessage() : e.getMessage();
+ }
+ return new ConversionStatus(taskId,
ConversionStatus.Status.FAILED, null, errorMessage, null);
+ }
+
+ // Completed successfully — extract the result and remove from map
+ pendingAsyncTasks.remove(taskId);
+ try {
+ ConvertDocumentResponse response = future.join();
+ String result = extractConvertedContent(response,
configuration.getOutputFormat());
+ return new ConversionStatus(taskId,
ConversionStatus.Status.COMPLETED, result, null, null);
+ } catch (Exception e) {
+ return new ConversionStatus(taskId,
ConversionStatus.Status.FAILED, null, e.getMessage(), null);
}
}
diff --git
a/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/DoclingAsyncConversionTest.java
b/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/DoclingAsyncConversionTest.java
new file mode 100644
index 000000000000..3bf29ebfe075
--- /dev/null
+++
b/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/DoclingAsyncConversionTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.docling;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import ai.docling.serve.api.convert.response.ConvertDocumentResponse;
+import ai.docling.serve.api.convert.response.DocumentResponse;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.test.junit6.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * Tests the SUBMIT_ASYNC_CONVERSION and CHECK_CONVERSION_STATUS two-step
async workflow.
+ *
+ * <p>
+ * Before the fix, the {@code CompletionStage} returned by {@code
convertSourceAsync()} was discarded and a fabricated
+ * task ID with no server-side correlation was returned.
CHECK_CONVERSION_STATUS would then fail because the server had
+ * no record of the fake ID, and the error was silently masked by returning
COMPLETED.
+ *
+ * <p>
+ * After the fix, the {@code CompletableFuture} is stored in a local map keyed
by the generated task ID. When
+ * CHECK_CONVERSION_STATUS is called, it checks the local map first and
returns the actual status of the async task.
+ */
+class DoclingAsyncConversionTest extends CamelTestSupport {
+
+ @Test
+ void submitReturnsTaskIdLinkedToFuture() throws Exception {
+ DoclingEndpoint endpoint = context.getEndpoint(
+
"docling:convert?operation=SUBMIT_ASYNC_CONVERSION&useDoclingServe=true",
DoclingEndpoint.class);
+ DoclingProducer producer = (DoclingProducer) endpoint.createProducer();
+
+ // Access the pendingAsyncTasks map via reflection to verify the
future is stored
+ Map<String, CompletableFuture<ConvertDocumentResponse>> pendingTasks =
getPendingAsyncTasks(producer);
+ assertNotNull(pendingTasks, "pendingAsyncTasks map should exist");
+ assertTrue(pendingTasks.isEmpty(), "pendingAsyncTasks should start
empty");
+ }
+
+ @Test
+ void checkStatusReturnsFailedForUnknownTaskId() throws Exception {
+ // When CHECK_CONVERSION_STATUS is called with an unknown task ID and
the server
+ // is not available, it should return FAILED — not COMPLETED (the old
bug).
+ try {
+ Exchange exchange = new DefaultExchange(context);
+ exchange.getIn().setHeader(DoclingHeaders.TASK_ID,
"nonexistent-task-id");
+ exchange.getIn().setHeader(DoclingHeaders.OPERATION,
DoclingOperations.CHECK_CONVERSION_STATUS);
+
+ template.send("direct:check-status", exchange);
+
+ Object body = exchange.getIn().getBody();
+ assertInstanceOf(ConversionStatus.class, body);
+ ConversionStatus status = (ConversionStatus) body;
+
+ // The key assertion: unknown task IDs should NOT return COMPLETED
+ assertNotEquals(ConversionStatus.Status.COMPLETED,
status.getStatus(),
+ "Unknown task ID should not return COMPLETED status");
+ assertEquals(ConversionStatus.Status.FAILED, status.getStatus(),
+ "Unknown task ID with unavailable server should return
FAILED");
+ assertNotNull(status.getErrorMessage(), "Error message should be
populated");
+ } catch (CamelExecutionException e) {
+ // If the exchange throws instead of setting FAILED status, that's
also acceptable —
+ // the important thing is it doesn't silently return COMPLETED
+ }
+ }
+
+ @Test
+ void checkStatusReturnsCompletedForFinishedLocalTask() throws Exception {
+ DoclingEndpoint endpoint = context.getEndpoint(
+
"docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true",
DoclingEndpoint.class);
+ DoclingProducer producer = (DoclingProducer) endpoint.createProducer();
+
+ // Manually insert a completed future into the pending tasks map
+ Map<String, CompletableFuture<ConvertDocumentResponse>> pendingTasks =
getPendingAsyncTasks(producer);
+
+ // Create a completed future with a mock response
+ ConvertDocumentResponse mockResponse =
ConvertDocumentResponse.builder()
+ .document(DocumentResponse.builder()
+ .markdownContent("# Converted Document")
+ .build())
+ .build();
+ CompletableFuture<ConvertDocumentResponse> completedFuture =
CompletableFuture.completedFuture(mockResponse);
+ pendingTasks.put("test-task-1", completedFuture);
+
+ // Check the status — should find it in local map and return COMPLETED
with result
+ Exchange exchange = new DefaultExchange(context);
+ exchange.getIn().setHeader(DoclingHeaders.TASK_ID, "test-task-1");
+
+ producer.process(exchange);
+
+ Object body = exchange.getIn().getBody();
+ assertInstanceOf(ConversionStatus.class, body);
+ ConversionStatus status = (ConversionStatus) body;
+ assertEquals(ConversionStatus.Status.COMPLETED, status.getStatus());
+ assertNotNull(status.getResult(), "Result should contain the converted
content");
+
+ // Future should be removed from map after completion
+ assertFalse(pendingTasks.containsKey("test-task-1"),
+ "Completed task should be removed from pending map");
+ }
+
+ @Test
+ void checkStatusReturnsInProgressForPendingLocalTask() throws Exception {
+ DoclingEndpoint endpoint = context.getEndpoint(
+
"docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true",
DoclingEndpoint.class);
+ DoclingProducer producer = (DoclingProducer) endpoint.createProducer();
+
+ Map<String, CompletableFuture<ConvertDocumentResponse>> pendingTasks =
getPendingAsyncTasks(producer);
+
+ // Insert an incomplete future
+ CompletableFuture<ConvertDocumentResponse> incompleteFuture = new
CompletableFuture<>();
+ pendingTasks.put("test-task-2", incompleteFuture);
+
+ Exchange exchange = new DefaultExchange(context);
+ exchange.getIn().setHeader(DoclingHeaders.TASK_ID, "test-task-2");
+
+ producer.process(exchange);
+
+ Object body = exchange.getIn().getBody();
+ assertInstanceOf(ConversionStatus.class, body);
+ ConversionStatus status = (ConversionStatus) body;
+ assertEquals(ConversionStatus.Status.IN_PROGRESS, status.getStatus());
+
+ // Future should remain in map since it's not done yet
+ assertTrue(pendingTasks.containsKey("test-task-2"),
+ "In-progress task should remain in pending map");
+
+ // Clean up
+ incompleteFuture.cancel(true);
+ }
+
+ @Test
+ void checkStatusReturnsFailedForExceptionalLocalTask() throws Exception {
+ DoclingEndpoint endpoint = context.getEndpoint(
+
"docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true",
DoclingEndpoint.class);
+ DoclingProducer producer = (DoclingProducer) endpoint.createProducer();
+
+ Map<String, CompletableFuture<ConvertDocumentResponse>> pendingTasks =
getPendingAsyncTasks(producer);
+
+ // Insert a failed future
+ CompletableFuture<ConvertDocumentResponse> failedFuture = new
CompletableFuture<>();
+ failedFuture.completeExceptionally(new RuntimeException("Server
connection refused"));
+ pendingTasks.put("test-task-3", failedFuture);
+
+ Exchange exchange = new DefaultExchange(context);
+ exchange.getIn().setHeader(DoclingHeaders.TASK_ID, "test-task-3");
+
+ producer.process(exchange);
+
+ Object body = exchange.getIn().getBody();
+ assertInstanceOf(ConversionStatus.class, body);
+ ConversionStatus status = (ConversionStatus) body;
+ assertEquals(ConversionStatus.Status.FAILED, status.getStatus());
+ assertNotNull(status.getErrorMessage());
+ assertTrue(status.getErrorMessage().contains("Server connection
refused"));
+
+ // Failed task should be removed from map
+ assertFalse(pendingTasks.containsKey("test-task-3"),
+ "Failed task should be removed from pending map");
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<String, CompletableFuture<ConvertDocumentResponse>>
getPendingAsyncTasks(DoclingProducer producer)
+ throws Exception {
+ Field field =
DoclingProducer.class.getDeclaredField("pendingAsyncTasks");
+ field.setAccessible(true);
+ return (Map<String, CompletableFuture<ConvertDocumentResponse>>)
field.get(producer);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:check-status")
+
.to("docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true");
+ }
+ };
+ }
+}