[
https://issues.apache.org/jira/browse/CAMEL-21400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17902026#comment-17902026
]
Claus Ibsen commented on CAMEL-21400:
-------------------------------------
The issue is the aggregator in completion size = 1 mode, will always trigger
completion and it will reusing the incoming thread to continue.
And because the incoming thread is doing a giant split, its stack-depth grows
bigger and bigger.
You need to make the aggregator output using its own thread, such as adding
.executorService(Executors.newSingleThreadExecutor())
Or turn on parallel processing
.parallelProcessing()
> StackOverflowError when processing files
> ----------------------------------------
>
> Key: CAMEL-21400
> URL: https://issues.apache.org/jira/browse/CAMEL-21400
> Project: Camel
> Issue Type: Bug
> Components: camel-core
> Affects Versions: 4.7.0
> Reporter: Antoine DESSAIGNE
> Assignee: Claus Ibsen
> Priority: Minor
> Fix For: 4.10.0
>
> Attachments: SplitAggregateTest.java
>
>
> Hello everyone,
> I just discovered a {{StackOverflowError}} when using reading files. Here's
> the smallest reprocase I could find.
> {code:java}
> // Create a temp directory with a CSV file
> Path tempDirectory = Files.createTempDirectory("camel-test");
> try (BufferedWriter writer =
> Files.newBufferedWriter(tempDirectory.resolve("file1.csv"))) {
> writer.write("fieldA,fieldB,fieldC,fieldD\n");
> for (int i = 0; i < 20000; i++) {
> writer.write("fieldA" + i + ",fieldB" + i + ",fieldC" + i + ",fieldD"
> + i + "\n");
> }
> }
> // Seems to fail if the target producer extends DefaultProducer and works if
> it extends DefaultAsyncProducer
> String target = "file://output"; // this fails
> //String target = "log://speed?groupSize=1000"; // this works
> DefaultCamelContext context = new DefaultCamelContext();
> context.addRoutes(new RouteBuilder() {
> @Override
> public void configure() {
> from("file://" + tempDirectory.toAbsolutePath() +
> "?noop=true").to("direct:read").log("Done!");
> from("direct:read").unmarshal().csv().split(body()).to("direct:agg");
> from("direct:agg").aggregate(constant("SINGLE_GROUP"), new
> GroupedExchangeAggregationStrategy())
> .completionSize(1)
> .setBody((Exchange exchange) -> {
> List<Exchange> list = (List<Exchange>)
> exchange.getMessage().getBody();
> return list.stream().map(e ->
> e.getMessage().getBody().toString()).collect(joining("\n"));
> })
> .to(target);
> }
> });
> context.start();
> {code}
> -As mentioned in the example, it only seems to fail if the processor in the
> aggregation is a {{DefaultProcessor}} and not a {{DefaultAsyncProcessor}}-
> It still fails after converting my component to {{DefaultAsyncProcessor}}, so
> it's unrelated.
> Can you have a look? Thank you
--
This message was sent by Atlassian Jira
(v8.20.10#820010)