[
https://issues.apache.org/jira/browse/CAMEL-22127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kumar Gaurav updated CAMEL-22127:
---------------------------------
Description:
h3. What happened
{{ConcurrentModificationException}} is coming *inside Camel’s Vert.x WebSocket*
*producer* while it traverses the collection that holds every live peer:
{{org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort}}
{{findPeersForHostPort()}} does
{{return connectedPeers.values() // Map<String,List<...>>
.stream() // ← another thread can mutate
.flatMap(List::stream) // its internal ArrayList
.collect(Collectors.toList());}}
While the stream is walking one of the internal {{{}ArrayList{}}}s *another
thread*
*removes or adds a WebSocket* (OPEN / CLOSE), so the underlying iterator throws
{{{}ConcurrentModificationException{}}}.
----
h3. How it happens?
getVertxHostRegistry().values() // Map.values() → mutable collection
.stream()
~~~~
.flatMap(host -> host.getConnectedPeers().stream()) // ← each host keeps
// a mutable List
If we check this host.getConnectedPeers() --> it is an arraylist :
!image-2025-05-29-17-27-30-528.png!
{{Collections.synchronizedList(new ArrayList<>())}} only *serialises each*
*method call* ({{{}add{}}}, {{{}remove{}}}, {{{}get{}}}) — {*}its iterator is
_not_ thread-safe{*}.
If one thread is iterating while another thread mutates the list you still get
{{{}ConcurrentModificationException{}}}.
*How to fix:*
*private final List<VertxWebsocketPeer> connectedPeers = new
CopyOnWriteArrayList<>();*
cheap reads and expensive writes ---> but better than Arraylist synchronized.
h3.
h3. Why it shows up only under load
* With *1 000 connections* and *32 writer threads* you call the producer
thousands of times per second (worker threads iterate peers), while Vert.x
I/O threads register/un-register peers concurrently.
* Under light traffic the race window is tiny, so you never notice it.
* {*}Root cause{*}: Vert.x WebSocket producer iterates an {{ArrayList}} while
another
thread mutates it.
* {*}Quick work-around{*}: funnel writes through a single consumer queue.
Raw stacktrace:
10:46:02.423 [Camel (camel-1) thread #31 - seda://outbound-delivery] WARN
com.bp.ihub.template - Failed delivery for (MessageId:
0B49011231DA68D-0000000000560726 on ExchangeId:
0B49011231DA68D-000000000056154B). On delivery attempt: 0 caught:
java.util.ConcurrentModificationException
java.util.ConcurrentModificationException: null
at
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1631)
at
java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
at
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
at
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at
java.base/java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3612)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at
org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort(VertxWebsocketEndpoint.java:234)
at
org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.getConnectedPeers(VertxWebsocketProducer.java:109)
at
org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.process(VertxWebsocketProducer.java:62)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:210)
at
org.apache.camel.management.DefaultInstrumentationProcessor.process(DefaultInstrumentationProcessor.java:90)
at
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:840)
at
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:746)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:163)
at
org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:354)
at
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:330)
at
org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:270)
at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:188)
at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:129)
at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241)
at java.base/java.lang.Thread.run(Thread.java:840)
was:
h3. What happened
{{ConcurrentModificationException}} is coming *inside Camel’s Vert.x WebSocket*
*producer* while it traverses the collection that holds every live peer:
{{org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort}}
{{findPeersForHostPort()}} does
{{return connectedPeers.values() // Map<String,List<...>>
.stream() // ← another thread can mutate
.flatMap(List::stream) // its internal ArrayList
.collect(Collectors.toList());}}
While the stream is walking one of the internal {{{}ArrayList{}}}s *another
thread*
*removes or adds a WebSocket* (OPEN / CLOSE), so the underlying iterator throws
{{{}ConcurrentModificationException{}}}.
----
h3. How it happens?
getVertxHostRegistry().values() // Map.values() → mutable collection
.stream()
~~~~
.flatMap(host -> host.getConnectedPeers().stream()) // ← each host keeps
// a mutable List
If we check this host.getConnectedPeers() --> it is an arraylist :
!image-2025-05-29-17-27-30-528.png!
h3.
h3. Why it shows up only under load
* With *1 000 connections* and *32 writer threads* you call the producer
thousands of times per second (worker threads iterate peers), while Vert.x
I/O threads register/un-register peers concurrently.
* Under light traffic the race window is tiny, so you never notice it.
* {*}Root cause{*}: Vert.x WebSocket producer iterates an {{ArrayList}} while
another
thread mutates it.
* {*}Quick work-around{*}: funnel writes through a single consumer queue.
Raw stacktrace:
10:46:02.423 [Camel (camel-1) thread #31 - seda://outbound-delivery] WARN
com.bp.ihub.template - Failed delivery for (MessageId:
0B49011231DA68D-0000000000560726 on ExchangeId:
0B49011231DA68D-000000000056154B). On delivery attempt: 0 caught:
java.util.ConcurrentModificationException
java.util.ConcurrentModificationException: null
at
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1631)
at
java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
at
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
at
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at
java.base/java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3612)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at
org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort(VertxWebsocketEndpoint.java:234)
at
org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.getConnectedPeers(VertxWebsocketProducer.java:109)
at
org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.process(VertxWebsocketProducer.java:62)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:210)
at
org.apache.camel.management.DefaultInstrumentationProcessor.process(DefaultInstrumentationProcessor.java:90)
at
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:840)
at
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:746)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:163)
at
org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:354)
at
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:330)
at
org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:270)
at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:188)
at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:129)
at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241)
at java.base/java.lang.Thread.run(Thread.java:840)
> ConcurrentModificationException is coming inside Camel’s Vert.x WebSocket
> -------------------------------------------------------------------------
>
> Key: CAMEL-22127
> URL: https://issues.apache.org/jira/browse/CAMEL-22127
> Project: Camel
> Issue Type: Bug
> Components: camel-platform-http-vertx
> Affects Versions: 4.4.5
> Reporter: Kumar Gaurav
> Priority: Major
> Attachments: image-2025-05-29-17-27-30-528.png
>
>
> h3. What happened
> {{ConcurrentModificationException}} is coming *inside Camel’s Vert.x
> WebSocket*
> *producer* while it traverses the collection that holds every live peer:
>
> {{org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort}}
> {{findPeersForHostPort()}} does
>
> {{return connectedPeers.values() // Map<String,List<...>>
> .stream() // ← another thread can mutate
> .flatMap(List::stream) // its internal ArrayList
> .collect(Collectors.toList());}}
> While the stream is walking one of the internal {{{}ArrayList{}}}s *another
> thread*
> *removes or adds a WebSocket* (OPEN / CLOSE), so the underlying iterator
> throws
> {{{}ConcurrentModificationException{}}}.
> ----
> h3. How it happens?
>
> getVertxHostRegistry().values() // Map.values() → mutable collection
> .stream()
> ~~~~
> .flatMap(host -> host.getConnectedPeers().stream()) // ← each host
> keeps
> // a mutable
> List
>
>
> If we check this host.getConnectedPeers() --> it is an arraylist :
> !image-2025-05-29-17-27-30-528.png!
> {{Collections.synchronizedList(new ArrayList<>())}} only *serialises each*
> *method call* ({{{}add{}}}, {{{}remove{}}}, {{{}get{}}}) — {*}its iterator is
> _not_ thread-safe{*}.
> If one thread is iterating while another thread mutates the list you still get
> {{{}ConcurrentModificationException{}}}.
>
> *How to fix:*
> *private final List<VertxWebsocketPeer> connectedPeers = new
> CopyOnWriteArrayList<>();*
>
> cheap reads and expensive writes ---> but better than Arraylist synchronized.
>
>
> h3.
> h3. Why it shows up only under load
> * With *1 000 connections* and *32 writer threads* you call the producer
> thousands of times per second (worker threads iterate peers), while Vert.x
> I/O threads register/un-register peers concurrently.
> * Under light traffic the race window is tiny, so you never notice it.
>
> * {*}Root cause{*}: Vert.x WebSocket producer iterates an {{ArrayList}}
> while another
> thread mutates it.
> * {*}Quick work-around{*}: funnel writes through a single consumer queue.
>
> Raw stacktrace:
>
> 10:46:02.423 [Camel (camel-1) thread #31 - seda://outbound-delivery] WARN
> com.bp.ihub.template - Failed delivery for (MessageId:
> 0B49011231DA68D-0000000000560726 on ExchangeId:
> 0B49011231DA68D-000000000056154B). On delivery attempt: 0 caught:
> java.util.ConcurrentModificationException
> java.util.ConcurrentModificationException: null
> at
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1631)
> at
> java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
> at
> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
> at
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
> at
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
> at
> java.base/java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3612)
> at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
> at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
> at
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
> at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
> at
> org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint.findPeersForHostPort(VertxWebsocketEndpoint.java:234)
> at
> org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.getConnectedPeers(VertxWebsocketProducer.java:109)
> at
> org.apache.camel.component.vertx.websocket.VertxWebsocketProducer.process(VertxWebsocketProducer.java:62)
> at
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:210)
> at
> org.apache.camel.management.DefaultInstrumentationProcessor.process(DefaultInstrumentationProcessor.java:90)
> at
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:840)
> at
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:746)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
> at org.apache.camel.processor.Pipeline.process(Pipeline.java:163)
> at
> org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:354)
> at
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:330)
> at
> org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:270)
> at
> org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:188)
> at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:129)
> at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at io.opentelemetry.context.Context.lambda$wrap$1(Context.java:241)
> at java.base/java.lang.Thread.run(Thread.java:840)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)