[ 
https://issues.apache.org/jira/browse/CAMEL-15723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17437261#comment-17437261
 ] 

Benjamin BONNET edited comment on CAMEL-15723 at 11/2/21, 10:18 AM:
--------------------------------------------------------------------

Hi,

We reproduced that issue on a heavy-loaded karaf with camel 3.11.2. This time, 
our aggregate has its own table. It is configured with optimistickLocking=true


It seems that there is a race condition that occurs with optimisticLock when 
two exchanges with same correlation Id are processed at the same time and when 
the aggregator does not already contain an aggregation with that correlation Id.


How it occurs ? 
Take 2 threads Th1 an Th2 having exchange1 and exchange2, with same 
correlationId key. If the following sequence occurs, you get a 
NullPointerException at line 171.


Th1 : invokes doAggregation(key, exchange1) on aggregate processor     
(AggregateProcessor line 477)
Th1 : invokes get(key) on repo   (AggregateProcessor line 484) => returns null
Th2 : invokes doAggregation(key, echange2) - that is possible since there is no 
lock
Th2 : invokes get(key) on repo => returns null and exchange 
CamelOptimisitcLockVersion remains not set  
Th1 : invokes add on repo (JdbcAggregationRepository line 152) => insert OK 
since no line with that key is "present"
Th2 : invokes add on repo => one line is present (the one added by Th1), but 
exchange2.getProp (CamelOptimistickLockversion) is null, which makes that 
NullPointerException at line 171


was (Author: bbonnet):
Hi,

We reproduced that issue on a heavy-loaded karaf. This time, our aggregate has 
its own table. It is configured with optimistickLocking=true


It seems that there is a race condition that occurs with optimisticLock when 
two exchanges with same correlation Id are processed at the same time and when 
the aggregator does not already contain an aggregation with that correlation Id.


How it occurs ? 
Take 2 threads Th1 an Th2 having exchange1 and exchange2, with same 
correlationId key. If the following sequence occurs, you get a 
NullPointerException at line 171.


Th1 : invokes doAggregation(key, exchange1) on aggregate processor     
(AggregateProcessor line 477)
Th1 : invokes get(key) on repo   (AggregateProcessor line 484) => returns null
Th2 : invokes doAggregation(key, echange2) - that is possible since there is no 
lock
Th2 : invokes get(key) on repo => returns null and exchange 
CamelOptimisitcLockVersion remains not set  
Th1 : invokes add on repo (JdbcAggregationRepository line 152) => insert OK 
since no line with that key is "present"
Th2 : invokes add on repo => one line is present (the one added by Th1), but 
exchange2.getProp (CamelOptimistickLockversion) is null, which makes that 
NullPointerException at line 171

> NPE when using PostgresAggregationRepository
> --------------------------------------------
>
>                 Key: CAMEL-15723
>                 URL: https://issues.apache.org/jira/browse/CAMEL-15723
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-sql
>    Affects Versions: 3.4.2
>         Environment: * Apache Camel 3.4.2
>  * Spring Boot 2.3.4.RELEASE
>  * Postgres database as an aggregate repository
>            Reporter: Corina Roman
>            Priority: Major
>             Fix For: 3.x
>
>
> Hi,
> we are getting production errors (NPE) when using a Postgres aggregate 
> repository and Camel code attempts to write to that table (under concurrent 
> use):
> {noformat}
> org.apache.camel.RuntimeCamelException: java.lang.RuntimeException: Error 
> adding to repository lb_data_uploads with key *** at 
> org.apache.camel.RuntimeCamelException.wrapRuntimeCamelException(RuntimeCamelException.java:52)
>  at 
> org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository.add(JdbcAggregationRepository.java:142)
>  at 
> org.apache.camel.processor.aggregate.AggregateProcessor.doAggregationRepositoryAdd(AggregateProcessor.java:644)
>  at 
> org.apache.camel.processor.aggregate.AggregateProcessor.doAggregation(AggregateProcessor.java:598)
>  at 
> org.apache.camel.processor.aggregate.AggregateProcessor.doProcess(AggregateProcessor.java:406)
>  at 
> org.apache.camel.processor.aggregate.AggregateProcessor.doInOptimisticLock(AggregateProcessor.java:372)
>  at 
> org.apache.camel.processor.aggregate.AggregateProcessor.doProcess(AggregateProcessor.java:362)
>  at 
> org.apache.camel.processor.aggregate.AggregateProcessor.process(AggregateProcessor.java:320)
>  at 
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:702)
>  at 
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:616)
>  at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
>  at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60)
>  at org.apache.camel.processor.Pipeline.process(Pipeline.java:147) at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:286)
>  at 
> org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
>  at 
> org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:40)
>  at 
> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:346)
>  at 
> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:222)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:834) Caused by: 
> java.lang.RuntimeException: Error adding to repository lb_data_uploads with 
> key *** at 
> org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository$1.doInTransaction(JdbcAggregationRepository.java:176)
>  at 
> org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository$1.doInTransaction(JdbcAggregationRepository.java:149)
>  at 
> org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
>  at 
> org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository.add(JdbcAggregationRepository.java:149)
>  at 
> org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository.add(JdbcAggregationRepository.java:137)
>  ... 21 common frames omitted Caused by: java.lang.NullPointerException: null 
> at 
> org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository$1.doInTransaction(JdbcAggregationRepository.java:167)
>  ... 25 common frames omitted{noformat}
>  
> The problematic code seems to be at *JdbcAggregationRepository.java* line 167:
> {noformat}
> private static final String VERSION_PROPERTY = "CamelOptimisticLockVersion";
> ....
> long version = exchange.getProperty(VERSION_PROPERTY, Long.class);
> {noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to