[
https://issues.apache.org/jira/browse/CAMEL-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Claus Ibsen resolved CAMEL-9143.
--------------------------------
Resolution: Fixed
Fix Version/s: 2.14.4
> Producers that implement the ServicePoolAware interface cause memory leak due
> to JMX references
> -----------------------------------------------------------------------------------------------
>
> Key: CAMEL-9143
> URL: https://issues.apache.org/jira/browse/CAMEL-9143
> Project: Camel
> Issue Type: Bug
> Components: camel-core
> Affects Versions: 2.14.1, 2.14.2, 2.15.0, 2.15.1
> Reporter: Bob Browning
> Assignee: Claus Ibsen
> Fix For: 2.16.0, 2.14.4, 2.15.4
>
>
> h4. Description
> Producer instances that implement the ServicePoolAware interface will leak
> memory if their route is stopped, with new producers being leaked every time
> the route is started/stopped.
> Known implementations that are affected are RemoteFileProducer (ftp, sftp)
> and Mina2Producer.
> This is due to the behaviour that the SendProcessor which when the route is
> stopped it shuts down it's `producerCache` instance.
> {code}
> protected void doStop() throws Exception {
> ServiceHelper.stopServices(producerCache, producer);
> }
> {code}
> this in turn calls `stopAndShutdownService(pool)` which will call stop on the
> SharedProducerServicePool instance which is a NOOP however it also calls
> shutdown which effects a stop of the global pool (this stops all the
> registered services and then clears the pool.
> {code}
> protected void doStop() throws Exception {
> // when stopping we intend to shutdown
> ServiceHelper.stopAndShutdownService(pool);
> try {
> ServiceHelper.stopAndShutdownServices(producers.values());
> } finally {
> // ensure producers are removed, and also from JMX
> for (Producer producer : producers.values()) {
> getCamelContext().removeService(producer);
> }
> }
> producers.clear();
> }
> {code}
> However no call to `context.removeService(Producer) is called for the entries
> from the pool only those singleton instances that were in the `producers` map
> hence the JMX `ManagedProducer` that is created when `doGetProducer` invokes
> {code} getCamelContext().addService(answer, false);
> {code} is never removed.
> Since the global pool is empty when the next request to get a producer is
> called a new producer is created, jmx wrapper and all, whilst the old
> instance remains orphaned retaining any objects that pertain to that instance.
> One workaround is for the producer to call
> {code}getEndpoint().getCamelContext().removeService(this){code} in it's stop
> method, however this is fairly obscure and it would probably be better to
> invoke removal of the producer when it is removed from the shared pool.
> Another issue of note is that when a route is shutdown that contains a
> SendProcessor due to the shutdown invocation on the
> SharedProcessorServicePool the global pool is cleared of `everything` and
> remains in `Stopped` state until another route starts it (although it is
> still accessed and used whilst in the `Stopped` state).
> h4. Impact
> For general use where there is no dynamic creation or passivation of routes
> this issue should be minimal, however in our use case where the routes are
> not static, there is a certain amount of recreation of routes as customer
> endpoints change and there is a need to passivate idle routes this causes a
> considerable memory leak (via SFTP in particular).
> h4. Test Case
> {code}
> package org.apache.camel.component;
> import com.google.common.util.concurrent.AtomicLongMap;
> import org.apache.camel.CamelContext;
> import org.apache.camel.Consumer;
> import org.apache.camel.Endpoint;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.Producer;
> import org.apache.camel.Route;
> import org.apache.camel.Service;
> import org.apache.camel.ServicePoolAware;
> import org.apache.camel.ServiceStatus;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultComponent;
> import org.apache.camel.impl.DefaultEndpoint;
> import org.apache.camel.impl.DefaultProducer;
> import org.apache.camel.support.LifecycleStrategySupport;
> import org.apache.camel.support.ServiceSupport;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.junit.Test;
> import java.util.Map;
> import static com.google.common.base.Preconditions.checkNotNull;
> /**
> * Test memory behaviour of producers using {@link ServicePoolAware} when
> using JMX.
> */
> public class ServicePoolAwareLeakyTest extends CamelTestSupport {
> private static final String LEAKY_SIEVE_STABLE =
> "leaky://sieve-stable?plugged=true";
> private static final String LEAKY_SIEVE_TRANSIENT =
> "leaky://sieve-transient?plugged=true";
> private static boolean isPatchApplied() {
> return Boolean.parseBoolean(System.getProperty("patchApplied", "false"));
> }
> /**
> * Component that provides leaks producers.
> */
> private static class LeakySieveComponent extends DefaultComponent {
> @Override
> protected Endpoint createEndpoint(String uri, String remaining,
> Map<String, Object> parameters) throws Exception {
> boolean plugged = "true".equalsIgnoreCase((String)
> parameters.remove("plugged"));
> return new LeakySieveEndpoint(uri, isPatchApplied() && plugged);
> }
> }
> /**
> * Endpoint that provides leaky producers.
> */
> private static class LeakySieveEndpoint extends DefaultEndpoint {
> private final String uri;
> private final boolean plugged;
> public LeakySieveEndpoint(String uri, boolean plugged) {
> this.uri = checkNotNull(uri, "uri must not be null");
> this.plugged = plugged;
> }
> @Override
> public Producer createProducer() throws Exception {
> return new LeakySieveProducer(this, plugged);
> }
> @Override
> public Consumer createConsumer(Processor processor) throws Exception {
> throw new UnsupportedOperationException();
> }
> @Override
> public boolean isSingleton() {
> return true;
> }
> @Override
> protected String createEndpointUri() {
> return uri;
> }
> }
> /**
> * Leaky producer - implements {@link ServicePoolAware}.
> */
> private static class LeakySieveProducer extends DefaultProducer implements
> ServicePoolAware {
> private final boolean plugged;
> public LeakySieveProducer(Endpoint endpoint, boolean plugged) {
> super(endpoint);
> this.plugged = plugged;
> }
> @Override
> public void process(Exchange exchange) throws Exception {
> // do nothing
> }
> @Override
> protected void doStop() throws Exception {
> super.doStop();
> //noinspection ConstantConditions
> if (plugged) {
> // need to remove self from services since we are ServicePoolAware
> this will not be handled for us otherwise we
> // leak memory
> getEndpoint().getCamelContext().removeService(this);
> }
> }
> }
> @Override
> protected boolean useJmx() {
> // only occurs when using JMX as the GC root for the producer is through
> a ManagedProducer created by the
> // context.addService() invocation
> return true;
> }
> /**
> * Returns true if verification of state should be performed during the
> test as opposed to at the end.
> */
> public boolean isFailFast() {
> return false;
> }
> /**
> * Returns true if during fast failure we should verify that the service
> pool remains in the started state.
> */
> public boolean isVerifyProducerServicePoolRemainsStarted() {
> return false;
> }
> @Override
> public boolean isUseAdviceWith() {
> return true;
> }
> @Test
> public void testForMemoryLeak() throws Exception {
> registerLeakyComponent();
> final AtomicLongMap<String> references = AtomicLongMap.create();
> // track LeakySieveProducer lifecycle
> context.addLifecycleStrategy(new LifecycleStrategySupport() {
> @Override
> public void onServiceAdd(CamelContext context, Service service, Route
> route) {
> if (service instanceof LeakySieveProducer) {
> references.incrementAndGet(((LeakySieveProducer)
> service).getEndpoint().getEndpointKey());
> }
> }
> @Override
> public void onServiceRemove(CamelContext context, Service service,
> Route route) {
> if (service instanceof LeakySieveProducer) {
> references.decrementAndGet(((LeakySieveProducer)
> service).getEndpoint().getEndpointKey());
> }
> }
> });
> context.addRoutes(new RouteBuilder() {
> @Override
> public void configure() throws Exception {
> from("direct:sieve-transient")
> .id("sieve-transient")
> .to(LEAKY_SIEVE_TRANSIENT);
> from("direct:sieve-stable")
> .id("sieve-stable")
> .to(LEAKY_SIEVE_STABLE);
> }
> });
> context.start();
> for (int i = 0; i < 1000; i++) {
> ServiceSupport service = (ServiceSupport)
> context.getProducerServicePool();
> assertEquals(ServiceStatus.Started, service.getStatus());
> if (isFailFast()) {
> assertEquals(2, context.getProducerServicePool().size());
> assertEquals(1, references.get(LEAKY_SIEVE_TRANSIENT));
> assertEquals(1, references.get(LEAKY_SIEVE_STABLE));
> }
> context.stopRoute("sieve-transient");
> if (isFailFast()) {
> assertEquals("Expected no service references to remain", 0,
> references.get(LEAKY_SIEVE_TRANSIENT));
> }
> if (isFailFast()) {
> // looks like we cleared more than just our route, we've stopped and
> cleared the global ProducerServicePool
> // since SendProcessor.stop() invokes
> ServiceHelper.stopServices(producerCache, producer); which in turn invokes
> // ServiceHelper.stopAndShutdownService(pool);.
> //
> // Whilst stop on the SharedProducerServicePool is a NOOP shutdown is
> not and effects a stop of the pool.
> if (isVerifyProducerServicePoolRemainsStarted()) {
> assertEquals(ServiceStatus.Started, service.getStatus());
> }
> assertEquals("Expected one stable producer to remain pooled", 1,
> context.getProducerServicePool().size());
> assertEquals("Expected one stable producer to remain as service", 1,
> references.get(LEAKY_SIEVE_STABLE));
> }
> // Send a body to verify behaviour of send producer after another route
> has been stopped
> sendBody("direct:sieve-stable", "");
> if (isFailFast()) {
> // shared pool is used despite being 'Stopped'
> if (isVerifyProducerServicePoolRemainsStarted()) {
> assertEquals(ServiceStatus.Started, service.getStatus());
> }
> assertEquals("Expected only stable producer in pool", 1,
> context.getProducerServicePool().size());
> assertEquals("Expected no references to transient producer", 0,
> references.get(LEAKY_SIEVE_TRANSIENT));
> assertEquals("Expected reference to stable producer", 1,
> references.get(LEAKY_SIEVE_STABLE));
> }
> context.startRoute("sieve-transient");
> // ok, back to normal
> assertEquals(ServiceStatus.Started, service.getStatus());
> if (isFailFast()) {
> assertEquals("Expected both producers in pool", 2,
> context.getProducerServicePool().size());
> assertEquals("Expected one transient producer as service", 1,
> references.get(LEAKY_SIEVE_TRANSIENT));
> assertEquals("Expected one stable producer as service", 1,
> references.get(LEAKY_SIEVE_STABLE));
> }
> }
> if (!isFailFast()) {
> assertEquals("Expected both producers in pool", 2,
> context.getProducerServicePool().size());
> // if not fixed these will equal the number of iterations in the loop +
> 1
> assertEquals("Expected one transient producer as service", 1,
> references.get(LEAKY_SIEVE_TRANSIENT));
> assertEquals("Expected one stable producer as service", 1,
> references.get(LEAKY_SIEVE_STABLE));
> }
> }
> private void registerLeakyComponent() {
> // register leaky component
> context.addComponent("leaky", new LeakySieveComponent());
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)