This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch task-18 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 15c3c4858d9fb4ef9c6eda6f2fb1b0bc685b8d29 Author: Andrea Cosentino <[email protected]> AuthorDate: Sun Mar 1 14:53:39 2026 +0100 CAMEL-23105 - camel-docling - Fix broken async conversion workflow (discarded result and error masking) The CompletionStage returned by convertSourceAsync() was discarded and a fabricated task ID with no server-side correlation was returned. The subsequent CHECK_CONVERSION_STATUS would fail silently because checkConversionStatusInternal() returned COMPLETED on any exception. Store the CompletableFuture in a ConcurrentHashMap keyed by a generated task ID. CHECK_CONVERSION_STATUS now checks the local map first and returns the actual future state (IN_PROGRESS, COMPLETED with result, or FAILED with error message). Also fix the error path to return FAILED instead of COMPLETED when an exception occurs. Signed-off-by: Andrea Cosentino <[email protected]> --- .../camel/component/docling/DoclingProducer.java | 55 +++++- .../docling/DoclingAsyncConversionTest.java | 200 +++++++++++++++++++++ 2 files changed, 250 insertions(+), 5 deletions(-) diff --git a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java index a1a04f6f70e9..8be170af8d10 100644 --- a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java +++ b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java @@ -32,12 +32,14 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -87,6 +89,8 @@ public class DoclingProducer extends DefaultProducer { private DoclingConfiguration configuration; private DoclingServeApi doclingServeApi; private ObjectMapper objectMapper; + private final Map<String, CompletableFuture<ConvertDocumentResponse>> pendingAsyncTasks = new ConcurrentHashMap<>(); + private final AtomicLong taskIdCounter = new AtomicLong(); public DoclingProducer(DoclingEndpoint endpoint) { super(endpoint); @@ -130,6 +134,9 @@ public class DoclingProducer extends DefaultProducer { @Override protected void doStop() throws Exception { super.doStop(); + // Cancel any pending async tasks + pendingAsyncTasks.forEach((id, future) -> future.cancel(true)); + pendingAsyncTasks.clear(); if (doclingServeApi != null) { doclingServeApi = null; LOG.info("DoclingServeApi reference cleared"); @@ -287,10 +294,12 @@ public class DoclingProducer extends DefaultProducer { // Start async conversion ConvertDocumentRequest request = buildConvertRequest(inputPath, outputFormat); - CompletionStage<ConvertDocumentResponse> asyncResult = doclingServeApi.convertSourceAsync(request); + CompletableFuture<ConvertDocumentResponse> asyncResult + = doclingServeApi.convertSourceAsync(request).toCompletableFuture(); - // Generate a task ID for tracking - String taskId = "task-" + System.currentTimeMillis() + "-" + inputPath.hashCode(); + // Generate a unique task ID and store the future for later status checks + String taskId = "task-" + taskIdCounter.incrementAndGet(); + pendingAsyncTasks.put(taskId, asyncResult); LOG.debug("Started async conversion with task ID: {}", taskId); // Set task ID in body and header @@ -345,6 +354,13 @@ public class DoclingProducer extends DefaultProducer { private ConversionStatus checkConversionStatusInternal(String taskId) { LOG.debug("Checking status for task: {}", taskId); + // Check the local pending tasks map first (tasks submitted via SUBMIT_ASYNC_CONVERSION) + CompletableFuture<ConvertDocumentResponse> future = pendingAsyncTasks.get(taskId); + if (future != null) { + return checkLocalAsyncTask(taskId, future); + } + + // Fall back to server-side task polling try { TaskStatusPollRequest pollRequest = TaskStatusPollRequest.builder() .taskId(taskId) @@ -374,8 +390,37 @@ public class DoclingProducer extends DefaultProducer { return new ConversionStatus(taskId, status); } catch (Exception e) { LOG.warn("Failed to check task status for {}: {}", taskId, e.getMessage()); - // If the task ID doesn't exist on the server, return a completed status as a fallback - return new ConversionStatus(taskId, ConversionStatus.Status.COMPLETED); + String errorMsg = e.getMessage() != null ? e.getMessage() : e.getClass().getName(); + return new ConversionStatus(taskId, ConversionStatus.Status.FAILED, null, errorMsg, null); + } + } + + private ConversionStatus checkLocalAsyncTask(String taskId, CompletableFuture<ConvertDocumentResponse> future) { + if (!future.isDone()) { + return new ConversionStatus(taskId, ConversionStatus.Status.IN_PROGRESS); + } + + if (future.isCompletedExceptionally() || future.isCancelled()) { + // Remove completed task from map + pendingAsyncTasks.remove(taskId); + String errorMessage; + try { + future.join(); + errorMessage = "Unknown error"; + } catch (Exception e) { + errorMessage = e.getCause() != null ? e.getCause().getMessage() : e.getMessage(); + } + return new ConversionStatus(taskId, ConversionStatus.Status.FAILED, null, errorMessage, null); + } + + // Completed successfully — extract the result and remove from map + pendingAsyncTasks.remove(taskId); + try { + ConvertDocumentResponse response = future.join(); + String result = extractConvertedContent(response, configuration.getOutputFormat()); + return new ConversionStatus(taskId, ConversionStatus.Status.COMPLETED, result, null, null); + } catch (Exception e) { + return new ConversionStatus(taskId, ConversionStatus.Status.FAILED, null, e.getMessage(), null); } } diff --git a/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/DoclingAsyncConversionTest.java b/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/DoclingAsyncConversionTest.java new file mode 100644 index 000000000000..3bf29ebfe075 --- /dev/null +++ b/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/DoclingAsyncConversionTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.docling; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import ai.docling.serve.api.convert.response.ConvertDocumentResponse; +import ai.docling.serve.api.convert.response.DocumentResponse; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.support.DefaultExchange; +import org.apache.camel.test.junit6.CamelTestSupport; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Tests the SUBMIT_ASYNC_CONVERSION and CHECK_CONVERSION_STATUS two-step async workflow. + * + * <p> + * Before the fix, the {@code CompletionStage} returned by {@code convertSourceAsync()} was discarded and a fabricated + * task ID with no server-side correlation was returned. CHECK_CONVERSION_STATUS would then fail because the server had + * no record of the fake ID, and the error was silently masked by returning COMPLETED. + * + * <p> + * After the fix, the {@code CompletableFuture} is stored in a local map keyed by the generated task ID. When + * CHECK_CONVERSION_STATUS is called, it checks the local map first and returns the actual status of the async task. + */ +class DoclingAsyncConversionTest extends CamelTestSupport { + + @Test + void submitReturnsTaskIdLinkedToFuture() throws Exception { + DoclingEndpoint endpoint = context.getEndpoint( + "docling:convert?operation=SUBMIT_ASYNC_CONVERSION&useDoclingServe=true", DoclingEndpoint.class); + DoclingProducer producer = (DoclingProducer) endpoint.createProducer(); + + // Access the pendingAsyncTasks map via reflection to verify the future is stored + Map<String, CompletableFuture<ConvertDocumentResponse>> pendingTasks = getPendingAsyncTasks(producer); + assertNotNull(pendingTasks, "pendingAsyncTasks map should exist"); + assertTrue(pendingTasks.isEmpty(), "pendingAsyncTasks should start empty"); + } + + @Test + void checkStatusReturnsFailedForUnknownTaskId() throws Exception { + // When CHECK_CONVERSION_STATUS is called with an unknown task ID and the server + // is not available, it should return FAILED — not COMPLETED (the old bug). + try { + Exchange exchange = new DefaultExchange(context); + exchange.getIn().setHeader(DoclingHeaders.TASK_ID, "nonexistent-task-id"); + exchange.getIn().setHeader(DoclingHeaders.OPERATION, DoclingOperations.CHECK_CONVERSION_STATUS); + + template.send("direct:check-status", exchange); + + Object body = exchange.getIn().getBody(); + assertInstanceOf(ConversionStatus.class, body); + ConversionStatus status = (ConversionStatus) body; + + // The key assertion: unknown task IDs should NOT return COMPLETED + assertNotEquals(ConversionStatus.Status.COMPLETED, status.getStatus(), + "Unknown task ID should not return COMPLETED status"); + assertEquals(ConversionStatus.Status.FAILED, status.getStatus(), + "Unknown task ID with unavailable server should return FAILED"); + assertNotNull(status.getErrorMessage(), "Error message should be populated"); + } catch (CamelExecutionException e) { + // If the exchange throws instead of setting FAILED status, that's also acceptable — + // the important thing is it doesn't silently return COMPLETED + } + } + + @Test + void checkStatusReturnsCompletedForFinishedLocalTask() throws Exception { + DoclingEndpoint endpoint = context.getEndpoint( + "docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true", DoclingEndpoint.class); + DoclingProducer producer = (DoclingProducer) endpoint.createProducer(); + + // Manually insert a completed future into the pending tasks map + Map<String, CompletableFuture<ConvertDocumentResponse>> pendingTasks = getPendingAsyncTasks(producer); + + // Create a completed future with a mock response + ConvertDocumentResponse mockResponse = ConvertDocumentResponse.builder() + .document(DocumentResponse.builder() + .markdownContent("# Converted Document") + .build()) + .build(); + CompletableFuture<ConvertDocumentResponse> completedFuture = CompletableFuture.completedFuture(mockResponse); + pendingTasks.put("test-task-1", completedFuture); + + // Check the status — should find it in local map and return COMPLETED with result + Exchange exchange = new DefaultExchange(context); + exchange.getIn().setHeader(DoclingHeaders.TASK_ID, "test-task-1"); + + producer.process(exchange); + + Object body = exchange.getIn().getBody(); + assertInstanceOf(ConversionStatus.class, body); + ConversionStatus status = (ConversionStatus) body; + assertEquals(ConversionStatus.Status.COMPLETED, status.getStatus()); + assertNotNull(status.getResult(), "Result should contain the converted content"); + + // Future should be removed from map after completion + assertFalse(pendingTasks.containsKey("test-task-1"), + "Completed task should be removed from pending map"); + } + + @Test + void checkStatusReturnsInProgressForPendingLocalTask() throws Exception { + DoclingEndpoint endpoint = context.getEndpoint( + "docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true", DoclingEndpoint.class); + DoclingProducer producer = (DoclingProducer) endpoint.createProducer(); + + Map<String, CompletableFuture<ConvertDocumentResponse>> pendingTasks = getPendingAsyncTasks(producer); + + // Insert an incomplete future + CompletableFuture<ConvertDocumentResponse> incompleteFuture = new CompletableFuture<>(); + pendingTasks.put("test-task-2", incompleteFuture); + + Exchange exchange = new DefaultExchange(context); + exchange.getIn().setHeader(DoclingHeaders.TASK_ID, "test-task-2"); + + producer.process(exchange); + + Object body = exchange.getIn().getBody(); + assertInstanceOf(ConversionStatus.class, body); + ConversionStatus status = (ConversionStatus) body; + assertEquals(ConversionStatus.Status.IN_PROGRESS, status.getStatus()); + + // Future should remain in map since it's not done yet + assertTrue(pendingTasks.containsKey("test-task-2"), + "In-progress task should remain in pending map"); + + // Clean up + incompleteFuture.cancel(true); + } + + @Test + void checkStatusReturnsFailedForExceptionalLocalTask() throws Exception { + DoclingEndpoint endpoint = context.getEndpoint( + "docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true", DoclingEndpoint.class); + DoclingProducer producer = (DoclingProducer) endpoint.createProducer(); + + Map<String, CompletableFuture<ConvertDocumentResponse>> pendingTasks = getPendingAsyncTasks(producer); + + // Insert a failed future + CompletableFuture<ConvertDocumentResponse> failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(new RuntimeException("Server connection refused")); + pendingTasks.put("test-task-3", failedFuture); + + Exchange exchange = new DefaultExchange(context); + exchange.getIn().setHeader(DoclingHeaders.TASK_ID, "test-task-3"); + + producer.process(exchange); + + Object body = exchange.getIn().getBody(); + assertInstanceOf(ConversionStatus.class, body); + ConversionStatus status = (ConversionStatus) body; + assertEquals(ConversionStatus.Status.FAILED, status.getStatus()); + assertNotNull(status.getErrorMessage()); + assertTrue(status.getErrorMessage().contains("Server connection refused")); + + // Failed task should be removed from map + assertFalse(pendingTasks.containsKey("test-task-3"), + "Failed task should be removed from pending map"); + } + + @SuppressWarnings("unchecked") + private Map<String, CompletableFuture<ConvertDocumentResponse>> getPendingAsyncTasks(DoclingProducer producer) + throws Exception { + Field field = DoclingProducer.class.getDeclaredField("pendingAsyncTasks"); + field.setAccessible(true); + return (Map<String, CompletableFuture<ConvertDocumentResponse>>) field.get(producer); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:check-status") + .to("docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true"); + } + }; + } +}
