[
https://issues.apache.org/jira/browse/CAMEL-11609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102963#comment-16102963
]
Daniel Baldes commented on CAMEL-11609:
---------------------------------------
We have a workaround because in this case, we can specify fixed headers (which
leads to this.adaptheaders == false, i.e. headers are not modified, so no
issue).
Our routing structure is roughly as follows. There is a "direct:persistCSV"
route which uses marshal(univocityCsvDataFormat). This route may be called in
parallel by multiple other routes (via to("direct:persistCSV")), these use
split() and streaming(). Unfortunately I don't know much about camel and its
threading model as I'm new to this project, so it is entirely possible that we
are doing something wrong. So the question remains if this Marshaller is
supposed to work in a multi-threaded environment at all - if it is, then this
is an issue.
Simply using a synchronizedMap would rule out ConcurrentModificationException
as far as I can tell, but in order to not get jumbled headers, they should not
be kept and updated in the Marshaller instance at all I think - either always
read the headers from this.headers and never change it, or read headers from
the map into a local variable in the adaptheaders == true case. I don't know
why changing headers would need to be kept as Marshaller state.
> camel-univocity-parsers: marshaller not thread safe
> ---------------------------------------------------
>
> Key: CAMEL-11609
> URL: https://issues.apache.org/jira/browse/CAMEL-11609
> Project: Camel
> Issue Type: Bug
> Components: camel-csv
> Affects Versions: 2.19.1
> Reporter: Daniel Baldes
>
> org.apache.camel.dataformat.univocity.Marshaller.java is not thread safe.
> When this.adaptheaders is true, this.headers is modified in the wirteRow()
> method. This can lead to ConcurrentModificationExceptions (see below) and
> jumbled headers, occasionally.
> I use a {{UnivocityCsvDataFormat}} for marshalling CSV in a route which is
> called in parallel. The DataFormat creates a Marshaller with adaptheaders ==
> true when headers are not specified in the format.
> {code}java.util.ConcurrentModificationException: null
> at
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
> at
> java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
> at
> org.apache.camel.dataformat.univocity.Marshaller.writeRow(Marshaller.java:95)
> at
> org.apache.camel.dataformat.univocity.Marshaller.marshal(Marshaller.java:67)
> at
> org.apache.camel.dataformat.univocity.AbstractUniVocityDataFormat.marshal(AbstractUniVocityDataFormat.java:94)
> at
> org.apache.camel.processor.MarshalProcessor.process(MarshalProcessor.java:69)
> at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
> at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
> at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
> at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
> at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
> at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
> at
> org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:62)
> at
> org.apache.camel.processor.SendDynamicProcessor$1.doInAsyncProducer(SendDynamicProcessor.java:124)
> at
> org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436)
> at
> org.apache.camel.processor.SendDynamicProcessor.process(SendDynamicProcessor.java:119)
> at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
> at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
> at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
> at
> org.apache.camel.processor.WireTapProcessor$1.call(WireTapProcessor.java:137)
> at
> org.apache.camel.processor.WireTapProcessor$1.call(WireTapProcessor.java:133)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)