oscerd commented on code in PR #19676:
URL: https://github.com/apache/camel/pull/19676#discussion_r2456240729
##########
components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingServeClient.java:
##########
@@ -653,6 +662,124 @@ private String fetchTaskResult(String taskId) throws
IOException {
}
}
+ /**
+ * Convert multiple documents in batch using parallel processing.
+ *
+ * @param inputSources List of file paths or URLs to documents
+ * @param outputFormat Output format (md, json, html, text)
+ * @param batchSize Maximum number of documents in this batch
+ * @param parallelism Number of parallel threads to use
+ * @param failOnFirstError Whether to fail entire batch on first error
+ * @param useAsync Whether to use async mode for individual
conversions
+ * @return BatchProcessingResults containing all
conversion results
+ */
+ public BatchProcessingResults convertDocumentsBatch(
+ List<String> inputSources, String outputFormat, int batchSize, int
parallelism,
+ boolean failOnFirstError, boolean useAsync) {
+
+ LOG.info("Starting batch conversion of {} documents with
parallelism={}, failOnFirstError={}",
+ inputSources.size(), parallelism, failOnFirstError);
+
+ BatchProcessingResults results = new BatchProcessingResults();
+ results.setStartTimeMs(System.currentTimeMillis());
+
+ ExecutorService executor = Executors.newFixedThreadPool(parallelism);
+ List<Future<BatchConversionResult>> futures = new ArrayList<>();
+ AtomicInteger index = new AtomicInteger(0);
+
+ try {
+ // Submit all conversion tasks
+ for (String inputSource : inputSources) {
+ final int currentIndex = index.getAndIncrement();
+ final String documentId = "doc-" + currentIndex;
+
+ Callable<BatchConversionResult> task = () -> {
+ BatchConversionResult result = new
BatchConversionResult(documentId, inputSource);
+ result.setBatchIndex(currentIndex);
+ long startTime = System.currentTimeMillis();
+
+ try {
+ LOG.debug("Processing document {} (index {}): {}",
documentId, currentIndex, inputSource);
+
+ String converted;
+ if (useAsync) {
+ converted =
convertDocumentAsyncAndWait(inputSource, outputFormat);
+ } else {
+ converted = convertDocument(inputSource,
outputFormat);
+ }
+
+ result.setResult(converted);
+ result.setSuccess(true);
+ result.setProcessingTimeMs(System.currentTimeMillis()
- startTime);
+
+ LOG.debug("Successfully processed document {} in
{}ms", documentId, result.getProcessingTimeMs());
+
+ } catch (Exception e) {
+ result.setSuccess(false);
+ result.setErrorMessage(e.getMessage());
+ result.setProcessingTimeMs(System.currentTimeMillis()
- startTime);
+
+ LOG.error("Failed to process document {} (index {}):
{}", documentId, currentIndex,
+ e.getMessage(), e);
+ }
+
+ return result;
+ };
+
+ futures.add(executor.submit(task));
+ }
+
+ // Collect results
+ for (Future<BatchConversionResult> future : futures) {
+ try {
+ BatchConversionResult result = future.get();
Review Comment:
It should be better now.
##########
components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java:
##########
@@ -261,6 +278,164 @@ private void processCheckConversionStatus(Exchange
exchange) throws Exception {
}
}
+ private void processBatchConversion(Exchange exchange, String
outputFormat) throws Exception {
+ LOG.debug("DoclingProducer processing batch conversion with format:
{}", outputFormat);
+
+ if (!configuration.isUseDoclingServe()) {
+ throw new IllegalStateException(
+ "Batch operations require docling-serve mode
(useDoclingServe=true)");
+ }
+
+ // Extract document list from body
+ List<String> documentPaths = extractDocumentList(exchange);
+
+ if (documentPaths.isEmpty()) {
+ throw new IllegalArgumentException("No documents provided for
batch processing");
+ }
+
+ LOG.debug("Processing batch of {} documents", documentPaths.size());
+
+ // Get batch configuration from headers or use defaults
+ int batchSize = exchange.getIn().getHeader(DoclingHeaders.BATCH_SIZE,
configuration.getBatchSize(), Integer.class);
+ int parallelism
+ = exchange.getIn().getHeader(DoclingHeaders.BATCH_PARALLELISM,
configuration.getBatchParallelism(),
+ Integer.class);
+ boolean failOnFirstError =
exchange.getIn().getHeader(DoclingHeaders.BATCH_FAIL_ON_FIRST_ERROR,
+ configuration.isBatchFailOnFirstError(), Boolean.class);
+
+ // Check if we should use async mode for individual conversions
+ boolean useAsync = configuration.isUseAsyncMode();
+ Boolean asyncModeHeader =
exchange.getIn().getHeader(DoclingHeaders.USE_ASYNC_MODE, Boolean.class);
+ if (asyncModeHeader != null) {
+ useAsync = asyncModeHeader;
+ }
+
+ // Process batch using DoclingServeClient
+ BatchProcessingResults results =
doclingServeClient.convertDocumentsBatch(
+ documentPaths, outputFormat, batchSize, parallelism,
failOnFirstError, useAsync);
+
+ // Check if we should split results into individual exchanges
+ boolean splitResults = configuration.isSplitBatchResults();
+ Boolean splitResultsHeader =
exchange.getIn().getHeader("CamelDoclingBatchSplitResults", Boolean.class);
Review Comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]