[ https://issues.apache.org/jira/browse/CAMEL-10171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15416897#comment-15416897 ]
onder sezgin commented on CAMEL-10171: -------------------------------------- let me explain what i did. I found out that i need to detect continuation expiry in CxfConsumer. I logged a ticket and it is partially fixed. (https://issues.apache.org/jira/browse/CXF-7002) Because it is not provided via Contunation interface. However, let's assume we have method access in Continuation interface (as i did in my local source) and in CxfConsumer.asyncInvoke method i can add if(continuation.isExpired()) { //... // handle camel exchange // } I know contunation is expired but i dont have the exchange. > Camel CXF expired continuations cause memory leak > ------------------------------------------------- > > Key: CAMEL-10171 > URL: https://issues.apache.org/jira/browse/CAMEL-10171 > Project: Camel > Issue Type: Bug > Components: camel-cxf > Affects Versions: 2.17.1 > Reporter: Damian Malczyk > > Looks like exchanges expired by CXF continuation timeout are being > accumulated in InflightRepository. Tested with Camel 2.17.1 and > cxf-rt-transports-http-jetty: > Dependencies: > {code}<dependencies> > <dependency> > <groupId>org.apache.camel</groupId> > <artifactId>camel-core</artifactId> > <version>2.17.1</version> > </dependency> > <dependency> > <groupId>org.apache.camel</groupId> > <artifactId>camel-cxf</artifactId> > <version>2.17.1</version> > </dependency> > <dependency> > <groupId>org.apache.cxf</groupId> > <artifactId>cxf-rt-transports-http-jetty</artifactId> > <version>3.1.5</version> > </dependency> > </dependencies>{code} > Reproducer: > {code}import org.apache.camel.CamelContext; > import org.apache.camel.builder.RouteBuilder; > import org.apache.camel.component.cxf.CxfEndpoint; > import org.apache.camel.component.cxf.DataFormat; > import org.apache.camel.impl.DefaultCamelContext; > import org.springframework.util.StreamUtils; > import org.w3c.dom.Document; > import javax.xml.parsers.DocumentBuilder; > import javax.xml.parsers.DocumentBuilderFactory; > import javax.xml.soap.MessageFactory; > import javax.xml.soap.SOAPMessage; > import java.io.ByteArrayInputStream; > import java.io.ByteArrayOutputStream; > import java.net.HttpURLConnection; > import java.net.URL; > import java.util.Timer; > import java.util.TimerTask; > import java.util.concurrent.Executor; > import java.util.concurrent.Executors; > public class Sample { > private final static String URI = "http://127.0.0.1:8080/"; > private final static long CONTINUATION_TIMEOUT = 100L; > private final static long DELAYER_VALUE = 200L; > private final static int SENDER_THREADS = > Runtime.getRuntime().availableProcessors(); > private final static int MESSAGES_PER_SENDER = 10000; > private static void setupCamel() throws Exception { > final CamelContext camelContext = new DefaultCamelContext(); > final CxfEndpoint endpoint = (CxfEndpoint)camelContext.getEndpoint( > "cxf://" + URI ); > endpoint.setContinuationTimeout( CONTINUATION_TIMEOUT ); > endpoint.setDataFormat( DataFormat.PAYLOAD ); > camelContext.addRoutes( new RouteBuilder() { > public void configure() throws Exception { > from( endpoint ) > .threads() > .setBody( constant( "<ok />" ) ) > .delay( DELAYER_VALUE ) > .end(); > } > }); > final TimerTask repoSizeReporter = new TimerTask() { > public void run() { > System.out.println( "Inflight repository size: " + > camelContext.getInflightRepository().size() ); > System.gc(); > System.out.println( "Memory usage: " + > (Runtime.getRuntime().totalMemory() - > Runtime.getRuntime().freeMemory())/(1024*1024) + "MB" ); > } > }; > final Timer repoSizeReporterTimer = new Timer(); > repoSizeReporterTimer.schedule( repoSizeReporter, 1000, 1000 ); > camelContext.start(); > } > private static byte[] createSoapMessage() throws Exception { > final StringBuilder payloadBuilder = new StringBuilder( "<payload>" ); > for( int i = 0; i < 5000; i++ ) { > payloadBuilder.append( "<payloadElement />" ); > } > final String payload = payloadBuilder.append( "</payload>" > ).toString(); > final DocumentBuilder documentBuilder = > DocumentBuilderFactory.newInstance().newDocumentBuilder(); > final Document payloadDocument = documentBuilder.parse( new > ByteArrayInputStream( payload.getBytes() ) ); > final ByteArrayOutputStream soapOutStream = new > ByteArrayOutputStream(); > final SOAPMessage message = > MessageFactory.newInstance().createMessage(); > message.getSOAPBody().addDocument( payloadDocument ); > message.writeTo( soapOutStream ); > return soapOutStream.toByteArray(); > } > private static Runnable soapSender() { > return () -> { > try { > final byte[] soapMessage = createSoapMessage(); > for( int i = 0; i < MESSAGES_PER_SENDER; i++ ) { > final HttpURLConnection connection = > (HttpURLConnection)new URL( URI ).openConnection(); > connection.setDoOutput( true ); > connection.setRequestProperty( "Content-Type", "text/xml" > ); > connection.setRequestProperty( "SOAPAction", "\"\"" ); > connection.setRequestMethod( "POST" ); > connection.setRequestProperty( "Accept", "*/*" ); > connection.connect(); > StreamUtils.copy( soapMessage, > connection.getOutputStream() ); > connection.getResponseCode(); > connection.disconnect(); > } > } catch ( final Exception ex ) { > ex.printStackTrace(); > } > }; > } > public static void main(String[] args) throws Exception { > setupCamel(); > final Executor executor = Executors.newFixedThreadPool( > SENDER_THREADS ); > for( int i = 0; i < SENDER_THREADS; i++ ) { > executor.execute( soapSender() ); > } > } > }{code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)