Bob Browning created CAMEL-9143:
-----------------------------------
Summary: Producers that implement the ServicePoolAware interface
cause memory leak due to JMX references
Key: CAMEL-9143
URL: https://issues.apache.org/jira/browse/CAMEL-9143
Project: Camel
Issue Type: Bug
Components: camel-core
Affects Versions: 2.15.1, 2.15.0, 2.14.3, 2.14.2, 2.14.1
Reporter: Bob Browning
h4. Description
Producer instances that implement the ServicePoolAware interface will leak
memory if their route is stopped, with new producers being leaked every time
the route is started/stopped.
Known implementations that are affected are RemoteFileProducer (ftp, sftp) and
Mina2Producer.
This is due to the behaviour that the SendProcessor which when the route is
stopped it shuts down it's `producerCache` instance.
{code}
protected void doStop() throws Exception {
ServiceHelper.stopServices(producerCache, producer);
}
{code}
this in turn calls `stopAndShutdownService(pool)` which will call stop on the
SharedProducerServicePool instance which is a NOOP however it also calls
shutdown which effects a stop of the global pool (this stops all the registered
services and then clears the pool.
{code}
protected void doStop() throws Exception {
// when stopping we intend to shutdown
ServiceHelper.stopAndShutdownService(pool);
try {
ServiceHelper.stopAndShutdownServices(producers.values());
} finally {
// ensure producers are removed, and also from JMX
for (Producer producer : producers.values()) {
getCamelContext().removeService(producer);
}
}
producers.clear();
}
{code}
However no call to `context.removeService(Producer) is called for the entries
from the pool only those singleton instances that were in the `producers` map
hence the JMX `ManagedProducer` that is created when `doGetProducer` invokes
{code} getCamelContext().addService(answer, false);
{code} is never removed.
Since the global pool is empty when the next request to get a producer is
called a new producer is created, jmx wrapper and all, whilst the old instance
remains orphaned retaining any objects that pertain to that instance.
One workaround is for the producer to call
{code}getEndpoint().getCamelContext().removeService(this){code} in it's stop
method, however this is fairly obscure and it would probably be better to
invoke removal of the producer when it is removed from the shared pool.
Another issue of note is that when a route is shutdown that contains a
SendProcessor due to the shutdown invocation on the SharedProcessorServicePool
the global pool is cleared of `everything`.
h4. Impact
For general use where there is no dynamic creation or passivation of routes
this issue should be minimal, however in our use case where the routes are not
static, there is a certain amount of recreation of routes as customer endpoints
change and there is a need to passivate idle routes this causes a considerable
memory leak (via SFTP in particular).
h4. Test Case
{code}
package org.apache.camel.component;
import com.google.common.util.concurrent.AtomicLongMap;
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.Route;
import org.apache.camel.Service;
import org.apache.camel.ServicePoolAware;
import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.support.LifecycleStrategySupport;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Test memory behaviour of producers using {@link ServicePoolAware} when using
JMX.
*/
public class ServicePoolAwareLeakyTest extends CamelTestSupport {
private static final String LEAKY_SIEVE_STABLE =
"leaky://sieve-stable?plugged=true";
private static final String LEAKY_SIEVE_TRANSIENT =
"leaky://sieve-transient?plugged=true";
private static boolean isPatchApplied() {
return Boolean.parseBoolean(System.getProperty("patchApplied", "false"));
}
/**
* Component that provides leaks producers.
*/
private static class LeakySieveComponent extends DefaultComponent {
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String,
Object> parameters) throws Exception {
boolean plugged = "true".equalsIgnoreCase((String)
parameters.remove("plugged"));
return new LeakySieveEndpoint(uri, isPatchApplied() && plugged);
}
}
/**
* Endpoint that provides leaky producers.
*/
private static class LeakySieveEndpoint extends DefaultEndpoint {
private final String uri;
private final boolean plugged;
public LeakySieveEndpoint(String uri, boolean plugged) {
this.uri = checkNotNull(uri, "uri must not be null");
this.plugged = plugged;
}
@Override
public Producer createProducer() throws Exception {
return new LeakySieveProducer(this, plugged);
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public boolean isSingleton() {
return true;
}
@Override
protected String createEndpointUri() {
return uri;
}
}
/**
* Leaky producer - implements {@link ServicePoolAware}.
*/
private static class LeakySieveProducer extends DefaultProducer implements
ServicePoolAware {
private final boolean plugged;
public LeakySieveProducer(Endpoint endpoint, boolean plugged) {
super(endpoint);
this.plugged = plugged;
}
@Override
public void process(Exchange exchange) throws Exception {
// do nothing
}
@Override
protected void doStop() throws Exception {
super.doStop();
//noinspection ConstantConditions
if (plugged) {
// need to remove self from services since we are ServicePoolAware this
will not be handled for us otherwise we
// leak memory
getEndpoint().getCamelContext().removeService(this);
}
}
}
@Override
protected boolean useJmx() {
// only occurs when using JMX as the GC root for the producer is through a
ManagedProducer created by the
// context.addService() invocation
return true;
}
/**
* Returns true if verification of state should be performed during the test
as opposed to at the end.
*/
public boolean isFailFast() {
return false;
}
/**
* Returns true if during fast failure we should verify that the service pool
remains in the started state.
*/
public boolean isVerifyProducerServicePoolRemainsStarted() {
return false;
}
@Override
public boolean isUseAdviceWith() {
return true;
}
@Test
public void testForMemoryLeak() throws Exception {
registerLeakyComponent();
final AtomicLongMap<String> references = AtomicLongMap.create();
// track LeakySieveProducer lifecycle
context.addLifecycleStrategy(new LifecycleStrategySupport() {
@Override
public void onServiceAdd(CamelContext context, Service service, Route
route) {
if (service instanceof LeakySieveProducer) {
references.incrementAndGet(((LeakySieveProducer)
service).getEndpoint().getEndpointKey());
}
}
@Override
public void onServiceRemove(CamelContext context, Service service, Route
route) {
if (service instanceof LeakySieveProducer) {
references.decrementAndGet(((LeakySieveProducer)
service).getEndpoint().getEndpointKey());
}
}
});
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:sieve-transient")
.id("sieve-transient")
.to(LEAKY_SIEVE_TRANSIENT);
from("direct:sieve-stable")
.id("sieve-stable")
.to(LEAKY_SIEVE_STABLE);
}
});
context.start();
for (int i = 0; i < 1000; i++) {
ServiceSupport service = (ServiceSupport)
context.getProducerServicePool();
assertEquals(ServiceStatus.Started, service.getStatus());
if (isFailFast()) {
assertEquals(2, context.getProducerServicePool().size());
assertEquals(1, references.get(LEAKY_SIEVE_TRANSIENT));
assertEquals(1, references.get(LEAKY_SIEVE_STABLE));
}
context.stopRoute("sieve-transient");
if (isFailFast()) {
assertEquals("Expected no service references to remain", 0,
references.get(LEAKY_SIEVE_TRANSIENT));
}
if (isFailFast()) {
// looks like we cleared more than just our route, we've stopped and
cleared the global ProducerServicePool
// since SendProcessor.stop() invokes
ServiceHelper.stopServices(producerCache, producer); which in turn invokes
// ServiceHelper.stopAndShutdownService(pool);.
//
// Whilst stop on the SharedProducerServicePool is a NOOP shutdown is
not and effects a stop of the pool.
if (isVerifyProducerServicePoolRemainsStarted()) {
assertEquals(ServiceStatus.Started, service.getStatus());
}
assertEquals("Expected one stable producer to remain pooled", 1,
context.getProducerServicePool().size());
assertEquals("Expected one stable producer to remain as service", 1,
references.get(LEAKY_SIEVE_STABLE));
}
// Send a body to verify behaviour of send producer after another route
has been stopped
sendBody("direct:sieve-stable", "");
if (isFailFast()) {
// shared pool is used despite being 'Stopped'
if (isVerifyProducerServicePoolRemainsStarted()) {
assertEquals(ServiceStatus.Started, service.getStatus());
}
assertEquals("Expected only stable producer in pool", 1,
context.getProducerServicePool().size());
assertEquals("Expected no references to transient producer", 0,
references.get(LEAKY_SIEVE_TRANSIENT));
assertEquals("Expected reference to stable producer", 1,
references.get(LEAKY_SIEVE_STABLE));
}
context.startRoute("sieve-transient");
// ok, back to normal
assertEquals(ServiceStatus.Started, service.getStatus());
if (isFailFast()) {
assertEquals("Expected both producers in pool", 2,
context.getProducerServicePool().size());
assertEquals("Expected one transient producer as service", 1,
references.get(LEAKY_SIEVE_TRANSIENT));
assertEquals("Expected one stable producer as service", 1,
references.get(LEAKY_SIEVE_STABLE));
}
}
if (!isFailFast()) {
assertEquals("Expected both producers in pool", 2,
context.getProducerServicePool().size());
// if not fixed these will equal the number of iterations in the loop + 1
assertEquals("Expected one transient producer as service", 1,
references.get(LEAKY_SIEVE_TRANSIENT));
assertEquals("Expected one stable producer as service", 1,
references.get(LEAKY_SIEVE_STABLE));
}
}
private void registerLeakyComponent() {
// register leaky component
context.addComponent("leaky", new LeakySieveComponent());
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)