Damian Malczyk created CAMEL-10171:
--------------------------------------
Summary: Camel CXF expired continuations cause memory leak
Key: CAMEL-10171
URL: https://issues.apache.org/jira/browse/CAMEL-10171
Project: Camel
Issue Type: Bug
Components: camel-cxf
Affects Versions: 2.17.1
Reporter: Damian Malczyk
Looks like exchanges expired by CXF continuation timeout are being accumulated
in InflightRepository. Tested with Camel 2.17.1 and
cxf-rt-transports-http-jetty:
Dependencies:
{code}<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-cxf</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http-jetty</artifactId>
<version>3.1.5</version>
</dependency>
</dependencies>{code}
Reproducer:
{code}import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.cxf.CxfEndpoint;
import org.apache.camel.component.cxf.DataFormat;
import org.apache.camel.impl.DefaultCamelContext;
import org.springframework.util.StreamUtils;
import org.w3c.dom.Document;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.soap.MessageFactory;
import javax.xml.soap.SOAPMessage;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class Sample {
private final static String URI = "http://127.0.0.1:8080/";
private final static long CONTINUATION_TIMEOUT = 100L;
private final static long DELAYER_VALUE = 200L;
private final static int SENDER_THREADS =
Runtime.getRuntime().availableProcessors();
private final static int MESSAGES_PER_SENDER = 10000;
private static void setupCamel() throws Exception {
final CamelContext camelContext = new DefaultCamelContext();
final CxfEndpoint endpoint = (CxfEndpoint)camelContext.getEndpoint(
"cxf://" + URI );
endpoint.setContinuationTimeout( CONTINUATION_TIMEOUT );
endpoint.setDataFormat( DataFormat.PAYLOAD );
camelContext.addRoutes( new RouteBuilder() {
public void configure() throws Exception {
from( endpoint )
.threads()
.setBody( constant( "<ok />" ) )
.delay( DELAYER_VALUE )
.end();
}
});
final TimerTask repoSizeReporter = new TimerTask() {
public void run() {
System.out.println( "Inflight repository size: " +
camelContext.getInflightRepository().size() );
System.gc();
System.out.println( "Memory usage: " +
(Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory())/(1024*1024) + "MB" );
}
};
final Timer repoSizeReporterTimer = new Timer();
repoSizeReporterTimer.schedule( repoSizeReporter, 1000, 1000 );
camelContext.start();
}
private static byte[] createSoapMessage() throws Exception {
final StringBuilder payloadBuilder = new StringBuilder( "<payload>" );
for( int i = 0; i < 5000; i++ ) {
payloadBuilder.append( "<payloadElement />" );
}
final String payload = payloadBuilder.append( "</payload>" ).toString();
final DocumentBuilder documentBuilder =
DocumentBuilderFactory.newInstance().newDocumentBuilder();
final Document payloadDocument = documentBuilder.parse( new
ByteArrayInputStream( payload.getBytes() ) );
final ByteArrayOutputStream soapOutStream = new ByteArrayOutputStream();
final SOAPMessage message =
MessageFactory.newInstance().createMessage();
message.getSOAPBody().addDocument( payloadDocument );
message.writeTo( soapOutStream );
return soapOutStream.toByteArray();
}
private static Runnable soapSender() {
return () -> {
try {
final byte[] soapMessage = createSoapMessage();
for( int i = 0; i < MESSAGES_PER_SENDER; i++ ) {
final HttpURLConnection connection = (HttpURLConnection)new
URL( URI ).openConnection();
connection.setDoOutput( true );
connection.setRequestProperty( "Content-Type", "text/xml" );
connection.setRequestProperty( "SOAPAction", "\"\"" );
connection.setRequestMethod( "POST" );
connection.setRequestProperty( "Accept", "*/*" );
connection.connect();
StreamUtils.copy( soapMessage, connection.getOutputStream()
);
connection.getResponseCode();
connection.disconnect();
}
} catch ( final Exception ex ) {
ex.printStackTrace();
}
};
}
public static void main(String[] args) throws Exception {
setupCamel();
final Executor executor = Executors.newFixedThreadPool( SENDER_THREADS
);
for( int i = 0; i < SENDER_THREADS; i++ ) {
executor.execute( soapSender() );
}
}
}{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)