[ 
https://issues.apache.org/jira/browse/CAMEL-14327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17002757#comment-17002757
 ] 

Rafał Gała commented on CAMEL-14327:
------------------------------------

I think I got it.

Evictions are done in a separate thread. When the following method is executed:
{code:java}
    protected void onEvict(S s) {
        Endpoint e = getEndpoint.apply(s);
        Pool<S> p = pool.get(e);
        if (p != null) {
            if (p.evict(s)) {
                pool.remove(e);
            }
        } else {
            ServicePool.stop(s);
            try {
                e.getCamelContext().removeService(s);
            } catch (Exception ex) {
                LOG.error("Error removing service {}", s, ex);
            }
        }
    }
{code}
the call to *p.evict(s)* results in *doStop()* method of *Jt400PgmProducer* 
instance being called. After the p.evict(s) method has finished, but before 
pool.remove(e) call, the producer can still be acquired from the pool, but 
because the *doStop* has already been called and the *iSeries* field is already 
nulled, exchange processing fails on NPE.

I set breakpoints in *onEvict(S)* method of *ServicePool* and in 
*acquire(Endpoint endpoint)* method, they evict/acquire the same 
*Jt400PgmProducer* instance at the same time.

 
{noformat}
Daemon Thread [ForkJoinPool.commonPool-worker-6] (Suspended (breakpoint at line 
85 in ServicePool))     
        ServicePool<S>.onEvict(S) line: 85      
        146690937.accept(Object) line: not available    
        CaffeineLRUCache<K,V>.onRemoval(K, V, RemovalCause) line: 239   
        SSLMS<K,V>(BoundedLocalCache<K,V>).lambda$notifyRemoval$1(Object, 
Object, RemovalCause) line: 333       
        1106690494.run() line: not available    
        ForkJoinTask$RunnableExecuteAction.exec() line: 1402    
        ForkJoinTask$RunnableExecuteAction(ForkJoinTask<V>).doExec() line: 289  
        ForkJoinPool$WorkQueue.runTask(ForkJoinTask<?>) line: 1056      
        ForkJoinPool.runWorker(ForkJoinPool$WorkQueue) line: 1692       
        ForkJoinWorkerThread.run() line: 157    
{noformat}

{noformat}
Daemon Thread [Camel (camel-1) thread #17 - seda://seda] (Suspended (breakpoint 
at line 113 in ServicePool))    
        ServicePool<S>.acquire(Endpoint) line: 113      
        DefaultProducerCache.acquireProducer(Endpoint) line: 112        
        DefaultProducerCache.doInAsyncProducer(Endpoint, Exchange, 
AsyncCallback, ProducerCache$AsyncProducerCallback) line: 264        
        SendProcessor.process(Exchange, AsyncCallback) line: 162        
        RedeliveryErrorHandler$RedeliveryState.run() line: 476  
        DefaultReactiveExecutor$Worker.schedule(Runnable, boolean, boolean, 
boolean) line: 185  
        DefaultReactiveExecutor.scheduleMain(Runnable, String) line: 59 
        Pipeline.process(Exchange, AsyncCallback) line: 87      
        CamelInternalProcessor.process(Exchange, AsyncCallback) line: 228       
        SedaConsumer.sendToConsumers(Exchange) line: 266        
        SedaConsumer.doRun() line: 180  
        SedaConsumer.run() line: 125    
        
RejectableThreadPoolExecutor(ThreadPoolExecutor).runWorker(ThreadPoolExecutor$Worker)
 line: 1149        
        ThreadPoolExecutor$Worker.run() line: 624       
        Thread.run() line: 748  

{noformat}

Maybe the p.evict() should be called after pool.remove()?

> Jt400PgmProducer doStop() method called on actively used instance
> -----------------------------------------------------------------
>
>                 Key: CAMEL-14327
>                 URL: https://issues.apache.org/jira/browse/CAMEL-14327
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-jt400
>    Affects Versions: 3.0.0
>            Reporter: Rafał Gała
>            Priority: Major
>
> Today I migrated to 3.0.0 version and there seems to be an issue with Service 
> pooling for the Jt400PgmpProducer.
>  
> Here's what I have:
>  
> {code:java}
>     from(
>         "seda:someName?concurrentConsumers=2&size=10")
>             
> .to("jt400://{{as400.user}}:{{as400.password}}@{{as400.host}}/QSYS.LIB/PROGRAM.LIB/KFKEVR.SRVPGM?fieldsLength=200,2000,4,8,8,1000&outputFieldsIdx=0,1,2,3,4,5&connectionPool=#as400ConnectionPool&format=binary&procedureName=RECEIVEEVENT");
> {code}
> When concurrentConsumers attribute of seda endpoint is set to 1 everything 
> works fine, but when it is greater than 1 then it looks like the evict method 
> from MultiplePool class calls stop method on a Jt400PgmProducer instance that 
> is still being used (the process method on it is still getting called). This 
> results in nulling the iSeries object inside Jt400PgmProducer instance:
> {code:java}
>     @Override
>     protected void doStop() throws Exception {
>         if (iSeries != null) {
>             LOG.info("Releasing connection to {}", getISeriesEndpoint());
>             getISeriesEndpoint().releaseSystem(iSeries);
>             iSeries = null;
>         }
>     }
> {code}
> and when the process method gets called later on this instance, it fails with 
> NPE while constructing ServiceProgramCall:
> {code:java}
>     @Override
>     public void process(Exchange exchange) throws Exception {
> ...
>             pgmCall = new ServiceProgramCall(iSeries);
> ...            
> {code}
>  
> I believe this may be related to producer caching in ServicePool class, some 
> sort of key issue in the cache Map maybe?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to