I continue to test how address endpoint handles timeouts I get new issues, now it is org.apache.synapse.core.axis2.SynapseCallbackReceiver
if we use the same scenario and send, for example 10 requests to the proxy at the same time... Now after I added msgCtx.notifyAll(); all requests are successfully pass by PassThroughHttpSender in case of timeouts. However, some of them (like 3 out of 10) get the following exception: WARN - SynapseCallbackReceiver Synapse received a response for the request with message Id : urn:uuid:e30c5327-c23e-4254-9d3c-d93f02b9979b But a callback is not registered (anymore) to process this response It looks like SynapseCallbackReceiver properly get info that there was a connection timeout. However, SynapseCallbackReceiver continue to listen to the connection response... I do not understand why it is implemented in this way and what is more important how to notify SynapseCallbackReceiver from PassThroughHttpSender that it is all, so stop listening for a callback... can some help me with it? Reagrds, Nariman. On 01/05/2016 03:16 PM, Nariman Abdullayev (TD) wrote: Hi, I think that org.apache.synapse.transport.passthru.PassThroughHttpSender which is default transport sender for http in wso2esb 4.8.1 (not sure about 4.9.0 will check it later) does not return borrowed thread to worker pool on exception in some cases. It seems to me that this happens when exception occurs in external sequence (not in proxy which is directly accepts incoming request). For example, it occurs when you implement store-and-forward pattern with store and message processor. this is my simple WSO2ESB 4.8.1 configuration to reproduce the issue: <?xml version="1.0" encoding="UTF-8"?> <definitions xmlns="http://ws.apache.org/ns/synapse"<http://ws.apache.org/ns/synapse>> <registry provider="org.wso2.carbon.mediation.registry.WSO2Registry"> <parameter name="cachableDuration">15000</parameter> </registry> <import name="fileconnector" package="org.wso2.carbon.connector" status="enabled"/> <proxy name="ProxyTest" transports="http https" startOnLoad="true" trace="disable"> <target> <inSequence> <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2" type="STRING"/> <log level="custom"> <property name="text" value="store message"/> </log> <store messageStore="TestXMS"/> <log level="custom"> <property name="text" value="message stored"/> </log> </inSequence> <outSequence/> <faultSequence/> </target> </proxy> <localEntry key="ESBInstance">Test<description/> </localEntry> <endpoint name="HTTPEndpoint"> <http method="post" uri-template="http://localhost/index.php"<http://localhost/index.php>> <timeout> <duration>10</duration> <responseAction>fault</responseAction> </timeout> <suspendOnFailure> <errorCodes>-1</errorCodes> <initialDuration>0</initialDuration> <progressionFactor>1.0</progressionFactor> <maximumDuration>0</maximumDuration> </suspendOnFailure> <markForSuspension> <errorCodes>-1</errorCodes> </markForSuspension> </http> </endpoint> <sequence name="fault"> <log level="full"> <property name="MESSAGE" value="Executing default 'fault' sequence"/> <property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/> <property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/> </log> <drop/> </sequence> <sequence name="TestSequence"> <log level="full"> <property name="text" value="message recieved"/> </log> <call> <endpoint key="HTTPEndpoint"/> </call> <log level="full"> <property name="text" value="message processed"/> </log> </sequence> <sequence name="main"> <in> <log level="full"/> <filter source="get-property('To')" regex="http://localhost:9000.*"> <send/> </filter> </in> <out> <send/> </out> <description>The main sequence for the message mediation</description> </sequence> <messageStore name="TestXMS"/> <messageProcessor class="org.apache.synapse.message.processor.impl.sampler.SamplingProcessor" name="TestMP" messageStore="TestXMS"> <parameter name="interval">1000</parameter> <parameter name="sequence">TestSequence</parameter> <parameter name="concurrency">1</parameter> <parameter name="is.active">true</parameter> </messageProcessor> </definitions> configure endpoint to send request to nowhere and just call TestProxy 20 time or more in order to exhaust internal pool of worker threads... after 20th request is received, new requests will be accepted and stored in store but worker pool will be exhausted and messages will not be retried from message store. I think that there must be source code somewhere available for PassThroughHttpSender... but it was quicker to decompile repository/components/plugins/synapse-nhttp-transport_2.1.2.wso2v4.jar and look inside to see the reason of the problem. we should look inside sendRequestContent method: synchronized (msgContext) { while ((!Boolean.TRUE.equals(msgContext.getProperty("WAIT_BUILDER_IN_STREAM_COMPLETE"))) && (!Boolean.TRUE.equals(msgContext.getProperty("PASSTHRU_CONNECT_ERROR")))) { try { log.info("msgContext before wait"); msgContext.wait(); log.info("msgContext after wait"); } catch (InterruptedException e) { e.printStackTrace(); } } this is where the process stack on error. When exception occurs (in another thread) it "forgets" to notify current thread and msgContext waits forever (actually till server restart) so I slightly modify another class in the same package - DeliveryAgent, method errorConnecting. this method is used to catch a callback from a thread used to connect to a target host... so when we catch a callback we notify msgContext and inform it to continue by adding new synchronize block after targetErrorHandler... public void errorConnecting(HttpRoute route, int errorCode, String message) { Queue<MessageContext> queue = (Queue)this.waitingMessages.get(route); if (queue != null) { MessageContext msgCtx = (MessageContext)queue.poll(); if (msgCtx != null) { this.targetErrorHandler.handleError(msgCtx, errorCode, "Error connecting to the back end", null, ProtocolState.REQUEST_READY); synchronized (msgCtx) { log.info("errorConnecting: notify message context about error"); msgCtx.setProperty("PASSTHRU_CONNECT_ERROR", Boolean.TRUE); msgCtx.notifyAll(); } } } else { throw new IllegalStateException("Queue cannot be null for: " + route); } } I have done some tests and it looks like it fixes the problem of "dead" threads. However, I am not sure if it is a proper fix or not... any suggestions are welcomed.. decompiled and modified source files are attached Regards, Nariman. _______________________________________________ Dev mailing list [email protected]<mailto:[email protected]> http://wso2.org/cgi-bin/mailman/listinfo/dev
_______________________________________________ Dev mailing list [email protected] http://wso2.org/cgi-bin/mailman/listinfo/dev
