oscerd commented on code in PR #19676:
URL: https://github.com/apache/camel/pull/19676#discussion_r2456240729


##########
components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingServeClient.java:
##########
@@ -653,6 +662,124 @@ private String fetchTaskResult(String taskId) throws 
IOException {
         }
     }
 
+    /**
+     * Convert multiple documents in batch using parallel processing.
+     *
+     * @param  inputSources     List of file paths or URLs to documents
+     * @param  outputFormat     Output format (md, json, html, text)
+     * @param  batchSize        Maximum number of documents in this batch
+     * @param  parallelism      Number of parallel threads to use
+     * @param  failOnFirstError Whether to fail entire batch on first error
+     * @param  useAsync         Whether to use async mode for individual 
conversions
+     * @return                  BatchProcessingResults containing all 
conversion results
+     */
+    public BatchProcessingResults convertDocumentsBatch(
+            List<String> inputSources, String outputFormat, int batchSize, int 
parallelism,
+            boolean failOnFirstError, boolean useAsync) {
+
+        LOG.info("Starting batch conversion of {} documents with 
parallelism={}, failOnFirstError={}",
+                inputSources.size(), parallelism, failOnFirstError);
+
+        BatchProcessingResults results = new BatchProcessingResults();
+        results.setStartTimeMs(System.currentTimeMillis());
+
+        ExecutorService executor = Executors.newFixedThreadPool(parallelism);
+        List<Future<BatchConversionResult>> futures = new ArrayList<>();
+        AtomicInteger index = new AtomicInteger(0);
+
+        try {
+            // Submit all conversion tasks
+            for (String inputSource : inputSources) {
+                final int currentIndex = index.getAndIncrement();
+                final String documentId = "doc-" + currentIndex;
+
+                Callable<BatchConversionResult> task = () -> {
+                    BatchConversionResult result = new 
BatchConversionResult(documentId, inputSource);
+                    result.setBatchIndex(currentIndex);
+                    long startTime = System.currentTimeMillis();
+
+                    try {
+                        LOG.debug("Processing document {} (index {}): {}", 
documentId, currentIndex, inputSource);
+
+                        String converted;
+                        if (useAsync) {
+                            converted = 
convertDocumentAsyncAndWait(inputSource, outputFormat);
+                        } else {
+                            converted = convertDocument(inputSource, 
outputFormat);
+                        }
+
+                        result.setResult(converted);
+                        result.setSuccess(true);
+                        result.setProcessingTimeMs(System.currentTimeMillis() 
- startTime);
+
+                        LOG.debug("Successfully processed document {} in 
{}ms", documentId, result.getProcessingTimeMs());
+
+                    } catch (Exception e) {
+                        result.setSuccess(false);
+                        result.setErrorMessage(e.getMessage());
+                        result.setProcessingTimeMs(System.currentTimeMillis() 
- startTime);
+
+                        LOG.error("Failed to process document {} (index {}): 
{}", documentId, currentIndex,
+                                e.getMessage(), e);
+                    }
+
+                    return result;
+                };
+
+                futures.add(executor.submit(task));
+            }
+
+            // Collect results
+            for (Future<BatchConversionResult> future : futures) {
+                try {
+                    BatchConversionResult result = future.get();

Review Comment:
   It should be better now.



##########
components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java:
##########
@@ -261,6 +278,164 @@ private void processCheckConversionStatus(Exchange 
exchange) throws Exception {
         }
     }
 
+    private void processBatchConversion(Exchange exchange, String 
outputFormat) throws Exception {
+        LOG.debug("DoclingProducer processing batch conversion with format: 
{}", outputFormat);
+
+        if (!configuration.isUseDoclingServe()) {
+            throw new IllegalStateException(
+                    "Batch operations require docling-serve mode 
(useDoclingServe=true)");
+        }
+
+        // Extract document list from body
+        List<String> documentPaths = extractDocumentList(exchange);
+
+        if (documentPaths.isEmpty()) {
+            throw new IllegalArgumentException("No documents provided for 
batch processing");
+        }
+
+        LOG.debug("Processing batch of {} documents", documentPaths.size());
+
+        // Get batch configuration from headers or use defaults
+        int batchSize = exchange.getIn().getHeader(DoclingHeaders.BATCH_SIZE, 
configuration.getBatchSize(), Integer.class);
+        int parallelism
+                = exchange.getIn().getHeader(DoclingHeaders.BATCH_PARALLELISM, 
configuration.getBatchParallelism(),
+                        Integer.class);
+        boolean failOnFirstError = 
exchange.getIn().getHeader(DoclingHeaders.BATCH_FAIL_ON_FIRST_ERROR,
+                configuration.isBatchFailOnFirstError(), Boolean.class);
+
+        // Check if we should use async mode for individual conversions
+        boolean useAsync = configuration.isUseAsyncMode();
+        Boolean asyncModeHeader = 
exchange.getIn().getHeader(DoclingHeaders.USE_ASYNC_MODE, Boolean.class);
+        if (asyncModeHeader != null) {
+            useAsync = asyncModeHeader;
+        }
+
+        // Process batch using DoclingServeClient
+        BatchProcessingResults results = 
doclingServeClient.convertDocumentsBatch(
+                documentPaths, outputFormat, batchSize, parallelism, 
failOnFirstError, useAsync);
+
+        // Check if we should split results into individual exchanges
+        boolean splitResults = configuration.isSplitBatchResults();
+        Boolean splitResultsHeader = 
exchange.getIn().getHeader("CamelDoclingBatchSplitResults", Boolean.class);

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to