[ 
https://issues.apache.org/jira/browse/CAMEL-11609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102963#comment-16102963
 ] 

Daniel Baldes commented on CAMEL-11609:
---------------------------------------

We have a workaround because in this case, we can specify fixed headers (which 
leads to this.adaptheaders == false, i.e. headers are not modified, so no 
issue).

Our routing structure is roughly as follows.  There is a "direct:persistCSV" 
route which uses marshal(univocityCsvDataFormat). This route may be called in 
parallel by multiple other routes (via to("direct:persistCSV")), these use 
split() and streaming(). Unfortunately I don't know much about camel and its 
threading model as I'm new to this project, so it is entirely possible that we 
are doing something wrong. So the question remains if this Marshaller is 
supposed to work in a multi-threaded environment at all - if it is, then this 
is an issue.

Simply using a synchronizedMap would rule out ConcurrentModificationException 
as far as I can tell, but in order to not get jumbled headers, they should not 
be kept and updated in the Marshaller instance at all I think - either always 
read the headers from this.headers and never change it, or read headers from 
the map into a local variable in the adaptheaders == true case. I don't know 
why changing headers would need to be kept as Marshaller state.

> camel-univocity-parsers: marshaller not thread safe
> ---------------------------------------------------
>
>                 Key: CAMEL-11609
>                 URL: https://issues.apache.org/jira/browse/CAMEL-11609
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-csv
>    Affects Versions: 2.19.1
>            Reporter: Daniel Baldes
>
> org.apache.camel.dataformat.univocity.Marshaller.java is not thread safe.
> When this.adaptheaders is true, this.headers is modified in the wirteRow() 
> method. This can lead to ConcurrentModificationExceptions (see below) and 
> jumbled headers, occasionally.
> I use a {{UnivocityCsvDataFormat}} for marshalling CSV in a route which is 
> called in parallel. The DataFormat creates a Marshaller with adaptheaders == 
> true when headers are not specified in the format.
> {code}java.util.ConcurrentModificationException: null
>             at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
>             at 
> java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
>             at 
> org.apache.camel.dataformat.univocity.Marshaller.writeRow(Marshaller.java:95)
>             at 
> org.apache.camel.dataformat.univocity.Marshaller.marshal(Marshaller.java:67)
>             at 
> org.apache.camel.dataformat.univocity.AbstractUniVocityDataFormat.marshal(AbstractUniVocityDataFormat.java:94)
>             at 
> org.apache.camel.processor.MarshalProcessor.process(MarshalProcessor.java:69)
>             at 
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
>             at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
>             at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
>             at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
>             at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
>             at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
>             at 
> org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:62)
>             at 
> org.apache.camel.processor.SendDynamicProcessor$1.doInAsyncProducer(SendDynamicProcessor.java:124)
>             at 
> org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436)
>             at 
> org.apache.camel.processor.SendDynamicProcessor.process(SendDynamicProcessor.java:119)
>             at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
>             at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
>             at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
>             at 
> org.apache.camel.processor.WireTapProcessor$1.call(WireTapProcessor.java:137)
>             at 
> org.apache.camel.processor.WireTapProcessor$1.call(WireTapProcessor.java:133)
>             at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>             at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>             at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>             at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to