This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch task-20 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 586aa02d184ac31a91afef35e1a19451aaa75fa7 Author: Andrea Cosentino <[email protected]> AuthorDate: Mon Mar 2 18:35:50 2026 +0100 CAMEL-23120 - camel-docling - Implement batchSize sub-batch partitioning in batch processing The batchSize configuration parameter (default 10) was declared and read from headers in processBatchConversion() and processBatchStructuredData(), but the value was never actually applied. Both convertDocumentsBatch() and convertStructuredDataBatch() submitted all documents to the executor at once regardless of batchSize, making the parameter a no-op. This change makes batchSize control how many documents are submitted per sub-batch. Documents are partitioned into chunks of batchSize and each sub-batch is processed to completion before starting the next one. Within each sub-batch, up to batchParallelism threads run concurrently. The overall batchTimeout is tracked across sub-batches so remaining time decreases as sub-batches complete, and failOnFirstError stops processing across sub-batch boundaries. This provides back-pressure and controls memory usage when processing large document sets, preventing the creation of unbounded numbers of CompletableFutures. Signed-off-by: Andrea Cosentino <[email protected]> --- .../apache/camel/catalog/components/docling.json | 4 +- .../apache/camel/component/docling/docling.json | 4 +- .../component/docling/DoclingConfiguration.java | 5 +- .../camel/component/docling/DoclingProducer.java | 344 ++++++++++++--------- .../component/docling/BatchProcessingTest.java | 10 + .../dsl/DoclingComponentBuilderFactory.java | 7 +- .../dsl/DoclingEndpointBuilderFactory.java | 14 +- 7 files changed, 226 insertions(+), 162 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/docling.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/docling.json index 362c01fdb5d0..e10b6a98e0c2 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/docling.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/docling.json @@ -61,7 +61,7 @@ "workingDirectory": { "index": 34, "kind": "property", "displayName": "Working Directory", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Working directory for Docling execution" }, "batchFailOnFirstError": { "index": 35, "kind": "property", "displayName": "Batch Fail On First Error", "group": "batch", "label": "batch", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Fail entire batch on first error (true) or conti [...] "batchParallelism": { "index": 36, "kind": "property", "displayName": "Batch Parallelism", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 4, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Number of parallel threads for batch processing" }, - "batchSize": { "index": 37, "kind": "property", "displayName": "Batch Size", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Maximum number of documents to process in a single batch (batch operations only)" }, + "batchSize": { "index": 37, "kind": "property", "displayName": "Batch Size", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Number of documents to submit per sub-batch. Documents are partitioned into sub-b [...] "batchTimeout": { "index": 38, "kind": "property", "displayName": "Batch Timeout", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 300000, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Maximum time to wait for batch completion in milliseconds" }, "splitBatchResults": { "index": 39, "kind": "property", "displayName": "Split Batch Results", "group": "batch", "label": "batch", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Split batch results into individual exchanges (one per do [...] "chunkingIncludeRawText": { "index": 40, "kind": "property", "displayName": "Chunking Include Raw Text", "group": "chunking", "label": "chunking", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Include raw text in chunk output" }, @@ -147,7 +147,7 @@ "workingDirectory": { "index": 33, "kind": "parameter", "displayName": "Working Directory", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Working directory for Docling execution" }, "batchFailOnFirstError": { "index": 34, "kind": "parameter", "displayName": "Batch Fail On First Error", "group": "batch", "label": "batch", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Fail entire batch on first error (true) or cont [...] "batchParallelism": { "index": 35, "kind": "parameter", "displayName": "Batch Parallelism", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 4, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Number of parallel threads for batch processing" }, - "batchSize": { "index": 36, "kind": "parameter", "displayName": "Batch Size", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Maximum number of documents to process in a single batch (batch operations only)" }, + "batchSize": { "index": 36, "kind": "parameter", "displayName": "Batch Size", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Number of documents to submit per sub-batch. Documents are partitioned into sub- [...] "batchTimeout": { "index": 37, "kind": "parameter", "displayName": "Batch Timeout", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 300000, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Maximum time to wait for batch completion in milliseconds" }, "splitBatchResults": { "index": 38, "kind": "parameter", "displayName": "Split Batch Results", "group": "batch", "label": "batch", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Split batch results into individual exchanges (one per d [...] "chunkingIncludeRawText": { "index": 39, "kind": "parameter", "displayName": "Chunking Include Raw Text", "group": "chunking", "label": "chunking", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Include raw text in chunk output" }, diff --git a/components/camel-ai/camel-docling/src/generated/resources/META-INF/org/apache/camel/component/docling/docling.json b/components/camel-ai/camel-docling/src/generated/resources/META-INF/org/apache/camel/component/docling/docling.json index 362c01fdb5d0..e10b6a98e0c2 100644 --- a/components/camel-ai/camel-docling/src/generated/resources/META-INF/org/apache/camel/component/docling/docling.json +++ b/components/camel-ai/camel-docling/src/generated/resources/META-INF/org/apache/camel/component/docling/docling.json @@ -61,7 +61,7 @@ "workingDirectory": { "index": 34, "kind": "property", "displayName": "Working Directory", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Working directory for Docling execution" }, "batchFailOnFirstError": { "index": 35, "kind": "property", "displayName": "Batch Fail On First Error", "group": "batch", "label": "batch", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Fail entire batch on first error (true) or conti [...] "batchParallelism": { "index": 36, "kind": "property", "displayName": "Batch Parallelism", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 4, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Number of parallel threads for batch processing" }, - "batchSize": { "index": 37, "kind": "property", "displayName": "Batch Size", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Maximum number of documents to process in a single batch (batch operations only)" }, + "batchSize": { "index": 37, "kind": "property", "displayName": "Batch Size", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Number of documents to submit per sub-batch. Documents are partitioned into sub-b [...] "batchTimeout": { "index": 38, "kind": "property", "displayName": "Batch Timeout", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 300000, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Maximum time to wait for batch completion in milliseconds" }, "splitBatchResults": { "index": 39, "kind": "property", "displayName": "Split Batch Results", "group": "batch", "label": "batch", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Split batch results into individual exchanges (one per do [...] "chunkingIncludeRawText": { "index": 40, "kind": "property", "displayName": "Chunking Include Raw Text", "group": "chunking", "label": "chunking", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Include raw text in chunk output" }, @@ -147,7 +147,7 @@ "workingDirectory": { "index": 33, "kind": "parameter", "displayName": "Working Directory", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Working directory for Docling execution" }, "batchFailOnFirstError": { "index": 34, "kind": "parameter", "displayName": "Batch Fail On First Error", "group": "batch", "label": "batch", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Fail entire batch on first error (true) or cont [...] "batchParallelism": { "index": 35, "kind": "parameter", "displayName": "Batch Parallelism", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 4, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Number of parallel threads for batch processing" }, - "batchSize": { "index": 36, "kind": "parameter", "displayName": "Batch Size", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Maximum number of documents to process in a single batch (batch operations only)" }, + "batchSize": { "index": 36, "kind": "parameter", "displayName": "Batch Size", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Number of documents to submit per sub-batch. Documents are partitioned into sub- [...] "batchTimeout": { "index": 37, "kind": "parameter", "displayName": "Batch Timeout", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 300000, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Maximum time to wait for batch completion in milliseconds" }, "splitBatchResults": { "index": 38, "kind": "parameter", "displayName": "Split Batch Results", "group": "batch", "label": "batch", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Split batch results into individual exchanges (one per d [...] "chunkingIncludeRawText": { "index": 39, "kind": "parameter", "displayName": "Chunking Include Raw Text", "group": "chunking", "label": "chunking", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Include raw text in chunk output" }, diff --git a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingConfiguration.java b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingConfiguration.java index a67ae6113b3a..3270a9adde80 100644 --- a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingConfiguration.java +++ b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingConfiguration.java @@ -113,7 +113,10 @@ public class DoclingConfiguration implements Cloneable { private long asyncTimeout = 300000; // 5 minutes @UriParam(label = "batch") - @Metadata(description = "Maximum number of documents to process in a single batch (batch operations only)", + @Metadata(description = "Number of documents to submit per sub-batch. Documents are partitioned into sub-batches of this size" + + " and each sub-batch is processed before starting the next one. Within each sub-batch, up to" + + " batchParallelism threads are used concurrently. This controls memory usage and back-pressure" + + " when processing large document sets.", defaultValue = "10") private int batchSize = 10; diff --git a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java index 8be170af8d10..df0f851be49d 100644 --- a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java +++ b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java @@ -764,8 +764,9 @@ public class DoclingProducer extends DefaultProducer { List<String> inputSources, String outputFormat, int batchSize, int parallelism, boolean failOnFirstError, boolean useAsync, long batchTimeout) { - LOG.info("Starting batch conversion of {} documents with parallelism={}, failOnFirstError={}, timeout={}ms", - inputSources.size(), parallelism, failOnFirstError, batchTimeout); + LOG.info( + "Starting batch conversion of {} documents with batchSize={}, parallelism={}, failOnFirstError={}, timeout={}ms", + inputSources.size(), batchSize, parallelism, failOnFirstError, batchTimeout); BatchProcessingResults results = new BatchProcessingResults(); results.setStartTimeMs(System.currentTimeMillis()); @@ -776,101 +777,118 @@ public class DoclingProducer extends DefaultProducer { AtomicBoolean shouldCancel = new AtomicBoolean(false); try { - // Create CompletableFutures for all conversion tasks - List<CompletableFuture<BatchConversionResult>> futures = new ArrayList<>(); - - for (String inputSource : inputSources) { - final int currentIndex = index.getAndIncrement(); - final String documentId = "doc-" + currentIndex; - - CompletableFuture<BatchConversionResult> future = CompletableFuture.supplyAsync(() -> { - // Check if we should skip this task due to early termination - if (failOnFirstError && shouldCancel.get()) { - BatchConversionResult cancelledResult = new BatchConversionResult(documentId, inputSource); - cancelledResult.setBatchIndex(currentIndex); - cancelledResult.setSuccess(false); - cancelledResult.setErrorMessage("Cancelled due to previous failure"); - return cancelledResult; - } + // Partition documents into sub-batches of batchSize + for (int batchStart = 0; batchStart < inputSources.size(); batchStart += batchSize) { + if (failOnFirstError && shouldCancel.get()) { + break; + } - BatchConversionResult result = new BatchConversionResult(documentId, inputSource); - result.setBatchIndex(currentIndex); - long startTime = System.currentTimeMillis(); + int batchEnd = Math.min(batchStart + batchSize, inputSources.size()); + List<String> subBatch = inputSources.subList(batchStart, batchEnd); - try { - LOG.debug("Processing document {} (index {}): {}", documentId, currentIndex, inputSource); + LOG.debug("Processing sub-batch [{}-{}] of {} total documents", + batchStart, batchEnd - 1, inputSources.size()); - String converted; - if (useAsync) { - converted = convertDocumentAsyncAndWait(inputSource, outputFormat); - } else { - converted = convertDocumentSync(inputSource, outputFormat); + List<CompletableFuture<BatchConversionResult>> futures = new ArrayList<>(); + + for (String inputSource : subBatch) { + final int currentIndex = index.getAndIncrement(); + final String documentId = "doc-" + currentIndex; + + CompletableFuture<BatchConversionResult> future = CompletableFuture.supplyAsync(() -> { + // Check if we should skip this task due to early termination + if (failOnFirstError && shouldCancel.get()) { + BatchConversionResult cancelledResult = new BatchConversionResult(documentId, inputSource); + cancelledResult.setBatchIndex(currentIndex); + cancelledResult.setSuccess(false); + cancelledResult.setErrorMessage("Cancelled due to previous failure"); + return cancelledResult; } - result.setResult(converted); - result.setSuccess(true); - result.setProcessingTimeMs(System.currentTimeMillis() - startTime); + BatchConversionResult result = new BatchConversionResult(documentId, inputSource); + result.setBatchIndex(currentIndex); + long startTime = System.currentTimeMillis(); - LOG.debug("Successfully processed document {} in {}ms", documentId, - result.getProcessingTimeMs()); + try { + LOG.debug("Processing document {} (index {}): {}", documentId, currentIndex, inputSource); - } catch (Exception e) { - result.setSuccess(false); - result.setErrorMessage(e.getMessage()); - result.setProcessingTimeMs(System.currentTimeMillis() - startTime); + String converted; + if (useAsync) { + converted = convertDocumentAsyncAndWait(inputSource, outputFormat); + } else { + converted = convertDocumentSync(inputSource, outputFormat); + } + + result.setResult(converted); + result.setSuccess(true); + result.setProcessingTimeMs(System.currentTimeMillis() - startTime); + + LOG.debug("Successfully processed document {} in {}ms", documentId, + result.getProcessingTimeMs()); - LOG.error("Failed to process document {} (index {}): {}", documentId, currentIndex, - e.getMessage(), e); + } catch (Exception e) { + result.setSuccess(false); + result.setErrorMessage(e.getMessage()); + result.setProcessingTimeMs(System.currentTimeMillis() - startTime); - // Signal other tasks to cancel if failOnFirstError is enabled - if (failOnFirstError) { - shouldCancel.set(true); + LOG.error("Failed to process document {} (index {}): {}", documentId, currentIndex, + e.getMessage(), e); + + // Signal other tasks to cancel if failOnFirstError is enabled + if (failOnFirstError) { + shouldCancel.set(true); + } } - } - return result; - }, executor); + return result; + }, executor); - futures.add(future); - } + futures.add(future); + } - // Wait for all futures to complete with timeout - CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + // Wait for sub-batch to complete with remaining timeout + long elapsed = System.currentTimeMillis() - results.getStartTimeMs(); + long remainingTimeout = batchTimeout - elapsed; + if (remainingTimeout <= 0) { + futures.forEach(f -> f.cancel(true)); + throw new RuntimeException("Batch processing timed out after " + batchTimeout + "ms"); + } - try { - allOf.get(batchTimeout, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - LOG.error("Batch processing timed out after {}ms", batchTimeout); - // Cancel all incomplete futures - futures.forEach(f -> f.cancel(true)); - throw new RuntimeException("Batch processing timed out after " + batchTimeout + "ms", e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Batch processing interrupted", e); - futures.forEach(f -> f.cancel(true)); - throw new RuntimeException("Batch processing interrupted", e); - } catch (Exception e) { - LOG.error("Batch processing failed", e); - futures.forEach(f -> f.cancel(true)); - throw new RuntimeException("Batch processing failed", e); - } + CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); - // Collect all results - for (CompletableFuture<BatchConversionResult> future : futures) { try { - BatchConversionResult result = future.getNow(null); - if (result != null) { - results.addResult(result); - - // If failOnFirstError and we hit a failure, stop adding more results - if (failOnFirstError && !result.isSuccess()) { - LOG.warn("Failing batch due to error in document {}: {}", result.getDocumentId(), - result.getErrorMessage()); - break; + allOf.get(remainingTimeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + LOG.error("Batch processing timed out after {}ms", batchTimeout); + futures.forEach(f -> f.cancel(true)); + throw new RuntimeException("Batch processing timed out after " + batchTimeout + "ms", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Batch processing interrupted", e); + futures.forEach(f -> f.cancel(true)); + throw new RuntimeException("Batch processing interrupted", e); + } catch (Exception e) { + LOG.error("Batch processing failed", e); + futures.forEach(f -> f.cancel(true)); + throw new RuntimeException("Batch processing failed", e); + } + + // Collect results from this sub-batch + for (CompletableFuture<BatchConversionResult> future : futures) { + try { + BatchConversionResult result = future.getNow(null); + if (result != null) { + results.addResult(result); + + if (failOnFirstError && !result.isSuccess()) { + LOG.warn("Failing batch due to error in document {}: {}", result.getDocumentId(), + result.getErrorMessage()); + break; + } } + } catch (Exception e) { + LOG.error("Error retrieving result", e); } - } catch (Exception e) { - LOG.error("Error retrieving result", e); } } @@ -1115,8 +1133,8 @@ public class DoclingProducer extends DefaultProducer { boolean failOnFirstError, boolean useAsync, long batchTimeout) { LOG.info( - "Starting batch structured data extraction of {} documents with parallelism={}, failOnFirstError={}, timeout={}ms", - inputSources.size(), parallelism, failOnFirstError, batchTimeout); + "Starting batch structured data extraction of {} documents with batchSize={}, parallelism={}, failOnFirstError={}, timeout={}ms", + inputSources.size(), batchSize, parallelism, failOnFirstError, batchTimeout); BatchProcessingResults results = new BatchProcessingResults(); results.setStartTimeMs(System.currentTimeMillis()); @@ -1127,93 +1145,117 @@ public class DoclingProducer extends DefaultProducer { AtomicBoolean shouldCancel = new AtomicBoolean(false); try { - List<CompletableFuture<BatchConversionResult>> futures = new ArrayList<>(); - - for (String inputSource : inputSources) { - final int currentIndex = index.getAndIncrement(); - final String documentId = "doc-" + currentIndex; - - CompletableFuture<BatchConversionResult> future = CompletableFuture.supplyAsync(() -> { - if (failOnFirstError && shouldCancel.get()) { - BatchConversionResult cancelledResult = new BatchConversionResult(documentId, inputSource); - cancelledResult.setBatchIndex(currentIndex); - cancelledResult.setSuccess(false); - cancelledResult.setErrorMessage("Cancelled due to previous failure"); - return cancelledResult; - } + // Partition documents into sub-batches of batchSize + for (int batchStart = 0; batchStart < inputSources.size(); batchStart += batchSize) { + if (failOnFirstError && shouldCancel.get()) { + break; + } - BatchConversionResult result = new BatchConversionResult(documentId, inputSource); - result.setBatchIndex(currentIndex); - long startTime = System.currentTimeMillis(); + int batchEnd = Math.min(batchStart + batchSize, inputSources.size()); + List<String> subBatch = inputSources.subList(batchStart, batchEnd); - try { - LOG.debug("Extracting structured data from document {} (index {}): {}", documentId, currentIndex, - inputSource); + LOG.debug("Processing sub-batch [{}-{}] of {} total documents", + batchStart, batchEnd - 1, inputSources.size()); - ConvertDocumentRequest request = buildStructuredDataRequest(inputSource); - String converted; - if (useAsync) { - converted = convertDocumentAsyncAndWait(request); - } else { - converted = convertDocumentSync(request); + List<CompletableFuture<BatchConversionResult>> futures = new ArrayList<>(); + + for (String inputSource : subBatch) { + final int currentIndex = index.getAndIncrement(); + final String documentId = "doc-" + currentIndex; + + CompletableFuture<BatchConversionResult> future = CompletableFuture.supplyAsync(() -> { + if (failOnFirstError && shouldCancel.get()) { + BatchConversionResult cancelledResult = new BatchConversionResult(documentId, inputSource); + cancelledResult.setBatchIndex(currentIndex); + cancelledResult.setSuccess(false); + cancelledResult.setErrorMessage("Cancelled due to previous failure"); + return cancelledResult; } - result.setResult(converted); - result.setSuccess(true); - result.setProcessingTimeMs(System.currentTimeMillis() - startTime); + BatchConversionResult result = new BatchConversionResult(documentId, inputSource); + result.setBatchIndex(currentIndex); + long startTime = System.currentTimeMillis(); - } catch (Exception e) { - result.setSuccess(false); - result.setErrorMessage(e.getMessage()); - result.setProcessingTimeMs(System.currentTimeMillis() - startTime); + try { + LOG.debug("Extracting structured data from document {} (index {}): {}", documentId, + currentIndex, inputSource); - LOG.error("Failed to extract structured data from document {} (index {}): {}", documentId, - currentIndex, e.getMessage(), e); + ConvertDocumentRequest request = buildStructuredDataRequest(inputSource); + String converted; + if (useAsync) { + converted = convertDocumentAsyncAndWait(request); + } else { + converted = convertDocumentSync(request); + } + + result.setResult(converted); + result.setSuccess(true); + result.setProcessingTimeMs(System.currentTimeMillis() - startTime); + + } catch (Exception e) { + result.setSuccess(false); + result.setErrorMessage(e.getMessage()); + result.setProcessingTimeMs(System.currentTimeMillis() - startTime); + + LOG.error("Failed to extract structured data from document {} (index {}): {}", documentId, + currentIndex, e.getMessage(), e); - if (failOnFirstError) { - shouldCancel.set(true); + if (failOnFirstError) { + shouldCancel.set(true); + } } - } - return result; - }, executor); + return result; + }, executor); - futures.add(future); - } + futures.add(future); + } - CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + // Wait for sub-batch to complete with remaining timeout + long elapsed = System.currentTimeMillis() - results.getStartTimeMs(); + long remainingTimeout = batchTimeout - elapsed; + if (remainingTimeout <= 0) { + futures.forEach(f -> f.cancel(true)); + throw new RuntimeException( + "Batch structured data extraction timed out after " + batchTimeout + "ms"); + } - try { - allOf.get(batchTimeout, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - LOG.error("Batch structured data extraction timed out after {}ms", batchTimeout); - futures.forEach(f -> f.cancel(true)); - throw new RuntimeException("Batch structured data extraction timed out after " + batchTimeout + "ms", e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Batch structured data extraction interrupted", e); - futures.forEach(f -> f.cancel(true)); - throw new RuntimeException("Batch structured data extraction interrupted", e); - } catch (Exception e) { - LOG.error("Batch structured data extraction failed", e); - futures.forEach(f -> f.cancel(true)); - throw new RuntimeException("Batch structured data extraction failed", e); - } + CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); - for (CompletableFuture<BatchConversionResult> future : futures) { try { - BatchConversionResult result = future.getNow(null); - if (result != null) { - results.addResult(result); - - if (failOnFirstError && !result.isSuccess()) { - LOG.warn("Failing batch due to error in document {}: {}", result.getDocumentId(), - result.getErrorMessage()); - break; + allOf.get(remainingTimeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + LOG.error("Batch structured data extraction timed out after {}ms", batchTimeout); + futures.forEach(f -> f.cancel(true)); + throw new RuntimeException( + "Batch structured data extraction timed out after " + batchTimeout + "ms", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Batch structured data extraction interrupted", e); + futures.forEach(f -> f.cancel(true)); + throw new RuntimeException("Batch structured data extraction interrupted", e); + } catch (Exception e) { + LOG.error("Batch structured data extraction failed", e); + futures.forEach(f -> f.cancel(true)); + throw new RuntimeException("Batch structured data extraction failed", e); + } + + // Collect results from this sub-batch + for (CompletableFuture<BatchConversionResult> future : futures) { + try { + BatchConversionResult result = future.getNow(null); + if (result != null) { + results.addResult(result); + + if (failOnFirstError && !result.isSuccess()) { + LOG.warn("Failing batch due to error in document {}: {}", result.getDocumentId(), + result.getErrorMessage()); + break; + } } + } catch (Exception e) { + LOG.error("Error retrieving result", e); } - } catch (Exception e) { - LOG.error("Error retrieving result", e); } } diff --git a/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/BatchProcessingTest.java b/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/BatchProcessingTest.java index e1d902ce1c29..a70bda0e10bd 100644 --- a/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/BatchProcessingTest.java +++ b/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/BatchProcessingTest.java @@ -141,6 +141,16 @@ public class BatchProcessingTest extends CamelTestSupport { assertTrue(config.isSplitBatchResults()); } + @Test + public void testBatchSizeParsedFromEndpointUri() throws Exception { + DoclingEndpoint endpoint = (DoclingEndpoint) context.getEndpoint( + "docling:convert?useDoclingServe=true&batchSize=5&batchParallelism=2"); + DoclingConfiguration config = endpoint.getConfiguration(); + + assertEquals(5, config.getBatchSize()); + assertEquals(2, config.getBatchParallelism()); + } + @Test public void testBatchTimeoutConfiguration() { DoclingConfiguration config = new DoclingConfiguration(); diff --git a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DoclingComponentBuilderFactory.java b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DoclingComponentBuilderFactory.java index ddfac58453aa..97365d57ab3a 100644 --- a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DoclingComponentBuilderFactory.java +++ b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DoclingComponentBuilderFactory.java @@ -678,8 +678,11 @@ public interface DoclingComponentBuilderFactory { /** - * Maximum number of documents to process in a single batch (batch - * operations only). + * Number of documents to submit per sub-batch. Documents are + * partitioned into sub-batches of this size and each sub-batch is + * processed before starting the next one. Within each sub-batch, up to + * batchParallelism threads are used concurrently. This controls memory + * usage and back-pressure when processing large document sets. * * The option is a: <code>int</code> type. * diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DoclingEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DoclingEndpointBuilderFactory.java index 0ee94f1e577f..8801cda626ee 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DoclingEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DoclingEndpointBuilderFactory.java @@ -310,8 +310,11 @@ public interface DoclingEndpointBuilderFactory { return this; } /** - * Maximum number of documents to process in a single batch (batch - * operations only). + * Number of documents to submit per sub-batch. Documents are + * partitioned into sub-batches of this size and each sub-batch is + * processed before starting the next one. Within each sub-batch, up to + * batchParallelism threads are used concurrently. This controls memory + * usage and back-pressure when processing large document sets. * * The option is a: <code>int</code> type. * @@ -326,8 +329,11 @@ public interface DoclingEndpointBuilderFactory { return this; } /** - * Maximum number of documents to process in a single batch (batch - * operations only). + * Number of documents to submit per sub-batch. Documents are + * partitioned into sub-batches of this size and each sub-batch is + * processed before starting the next one. Within each sub-batch, up to + * batchParallelism threads are used concurrently. This controls memory + * usage and back-pressure when processing large document sets. * * The option will be converted to a <code>int</code> type. *
