I continue to test how address endpoint handles timeouts I get new issues, now 
it is  org.apache.synapse.core.axis2.SynapseCallbackReceiver

if we use the same scenario and send, for example 10 requests to the proxy at 
the same time...
Now after I added msgCtx.notifyAll(); all requests are successfully pass by 
PassThroughHttpSender in case of timeouts. However, some of them (like 3 out of 
10) get the following exception:

WARN - SynapseCallbackReceiver Synapse received a response for the request with 
message Id : urn:uuid:e30c5327-c23e-4254-9d3c-d93f02b9979b But a callback is 
not registered (anymore) to process this response

It looks like SynapseCallbackReceiver properly get info that there was a 
connection timeout. However, SynapseCallbackReceiver continue to listen to the 
connection response... I do not understand why it is implemented in this way 
and what is more important how to notify SynapseCallbackReceiver from 
PassThroughHttpSender that it is all, so stop listening for a callback...

can some help me with it?

Reagrds,
Nariman.


On 01/05/2016 03:16 PM, Nariman Abdullayev (TD) wrote:
Hi,


I think that org.apache.synapse.transport.passthru.PassThroughHttpSender which 
is default transport sender for http in wso2esb 4.8.1 (not sure about 4.9.0 
will check it later) does not return borrowed thread to worker pool on 
exception in some cases.

It seems to me that this happens when exception occurs in external sequence 
(not in proxy which is directly accepts incoming request). For example, it 
occurs when you implement store-and-forward pattern with store and message 
processor.

this is my simple WSO2ESB 4.8.1 configuration to reproduce the issue:

<?xml version="1.0" encoding="UTF-8"?>
<definitions 
xmlns="http://ws.apache.org/ns/synapse";<http://ws.apache.org/ns/synapse>>
   <registry provider="org.wso2.carbon.mediation.registry.WSO2Registry">
      <parameter name="cachableDuration">15000</parameter>
   </registry>
   <import name="fileconnector"
           package="org.wso2.carbon.connector"
           status="enabled"/>
   <proxy name="ProxyTest"
          transports="http https"
          startOnLoad="true"
          trace="disable">
      <target>
         <inSequence>
            <property name="FORCE_SC_ACCEPTED"
                      value="true"
                      scope="axis2"
                      type="STRING"/>
            <log level="custom">
               <property name="text" value="store message"/>
            </log>
            <store messageStore="TestXMS"/>
            <log level="custom">
               <property name="text" value="message stored"/>
            </log>
         </inSequence>
         <outSequence/>
         <faultSequence/>
      </target>
   </proxy>
   <localEntry key="ESBInstance">Test<description/>
   </localEntry>
   <endpoint name="HTTPEndpoint">
      <http method="post" 
uri-template="http://localhost/index.php";<http://localhost/index.php>>
         <timeout>
            <duration>10</duration>
            <responseAction>fault</responseAction>
         </timeout>
         <suspendOnFailure>
            <errorCodes>-1</errorCodes>
            <initialDuration>0</initialDuration>
            <progressionFactor>1.0</progressionFactor>
            <maximumDuration>0</maximumDuration>
         </suspendOnFailure>
         <markForSuspension>
            <errorCodes>-1</errorCodes>
         </markForSuspension>
      </http>
   </endpoint>
   <sequence name="fault">
      <log level="full">
         <property name="MESSAGE" value="Executing default 'fault' sequence"/>
         <property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/>
         <property name="ERROR_MESSAGE" 
expression="get-property('ERROR_MESSAGE')"/>
      </log>
      <drop/>
   </sequence>
   <sequence name="TestSequence">
      <log level="full">
         <property name="text" value="message recieved"/>
      </log>
      <call>
         <endpoint key="HTTPEndpoint"/>
      </call>
      <log level="full">
         <property name="text" value="message processed"/>
      </log>
   </sequence>
   <sequence name="main">
      <in>
         <log level="full"/>
         <filter source="get-property('To')" regex="http://localhost:9000.*";>
            <send/>
         </filter>
      </in>
      <out>
         <send/>
      </out>
      <description>The main sequence for the message mediation</description>
   </sequence>
   <messageStore name="TestXMS"/>
   <messageProcessor 
class="org.apache.synapse.message.processor.impl.sampler.SamplingProcessor"
                     name="TestMP"
                     messageStore="TestXMS">
      <parameter name="interval">1000</parameter>
      <parameter name="sequence">TestSequence</parameter>
      <parameter name="concurrency">1</parameter>
      <parameter name="is.active">true</parameter>
   </messageProcessor>
</definitions>



configure endpoint to send request to nowhere and just call TestProxy 20 time 
or more in order to exhaust internal pool of worker threads...

after 20th request is received, new requests will be accepted and stored in 
store but worker pool will be exhausted and messages will not be retried from 
message store.

I think that there must be source code somewhere available for 
PassThroughHttpSender... but it was quicker to decompile 
repository/components/plugins/synapse-nhttp-transport_2.1.2.wso2v4.jar and look 
inside to see the reason of the problem.

we should look inside sendRequestContent method:

synchronized (msgContext)
      {
        while 
((!Boolean.TRUE.equals(msgContext.getProperty("WAIT_BUILDER_IN_STREAM_COMPLETE")))
 && (!Boolean.TRUE.equals(msgContext.getProperty("PASSTHRU_CONNECT_ERROR")))) {
          try
          {
            log.info("msgContext before wait");
            msgContext.wait();
            log.info("msgContext after wait");
          }
          catch (InterruptedException e)
          {
            e.printStackTrace();
          }
        }



this is where the process stack on error. When exception occurs (in another 
thread) it "forgets" to notify current thread and msgContext waits forever 
(actually till server restart)

so I slightly modify another class in the same package - DeliveryAgent, method 
errorConnecting. this method is used to catch a callback from a thread used to 
connect to a target host... so when we catch a callback we notify msgContext 
and inform it to continue by adding new synchronize block after 
targetErrorHandler...

public void errorConnecting(HttpRoute route, int errorCode, String message)
  {
    Queue<MessageContext> queue = (Queue)this.waitingMessages.get(route);
    if (queue != null)
    {
      MessageContext msgCtx = (MessageContext)queue.poll();
      if (msgCtx != null) {
        this.targetErrorHandler.handleError(msgCtx, errorCode, "Error 
connecting to the back end", null, ProtocolState.REQUEST_READY);
        synchronized (msgCtx)
        {
            log.info("errorConnecting: notify message context about error");
            msgCtx.setProperty("PASSTHRU_CONNECT_ERROR", Boolean.TRUE);
            msgCtx.notifyAll();
        }
      }
    }
    else
    {
      throw new IllegalStateException("Queue cannot be null for: " + route);
    }
  }



I have done some tests and it looks like it fixes the problem of "dead" 
threads. However, I am not sure if it is a proper fix or not... any suggestions 
are welcomed..

decompiled and modified source files are attached

Regards,
Nariman.




_______________________________________________
Dev mailing list
[email protected]<mailto:[email protected]>
http://wso2.org/cgi-bin/mailman/listinfo/dev


_______________________________________________
Dev mailing list
[email protected]
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to