Author: davsclaus
Date: Wed Apr 7 09:02:46 2010
New Revision: 931471
URL: http://svn.apache.org/viewvc?rev=931471&view=rev
Log:
CAMEL-2558: ProducerCache is now exposed in JMX.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java
(with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedProducerCacheTest.java
- copied, changed from r931444,
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSendProcessorTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java?rev=931471&r1=931470&r2=931471&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
Wed Apr 7 09:02:46 2010
@@ -61,8 +61,6 @@ public class MethodInfo {
private ExchangePattern pattern = ExchangePattern.InOut;
private RecipientList recipientList;
- // TODO: This class should extends ServiceSupport so we can cleanup
recipientList when stopping
-
public MethodInfo(CamelContext camelContext, Class<?> type, Method method,
List<ParameterInfo> parameters, List<ParameterInfo> bodyParameters,
boolean hasCustomAnnotation, boolean
hasHandlerAnnotation, boolean voidAsInOnly) {
this.camelContext = camelContext;
@@ -107,6 +105,13 @@ public class MethodInfo {
AggregationStrategy strategy =
CamelContextHelper.mandatoryLookup(camelContext, annotation.strategyRef(),
AggregationStrategy.class);
recipientList.setAggregationStrategy(strategy);
}
+
+ // add created recipientList as a service so we have its lifecycle
managed
+ try {
+ camelContext.addService(recipientList);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
}
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=931471&r1=931470&r2=931471&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
Wed Apr 7 09:02:46 2010
@@ -721,9 +721,9 @@ public class DefaultProducerTemplate ext
protected void doStart() throws Exception {
if (producerCache == null) {
if (maximumCacheSize > 0) {
- producerCache = new ProducerCache(context, maximumCacheSize);
+ producerCache = new ProducerCache(this, context,
maximumCacheSize);
} else {
- producerCache = new ProducerCache(context);
+ producerCache = new ProducerCache(this, context);
}
}
ServiceHelper.startService(producerCache);
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=931471&r1=931470&r2=931471&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
Wed Apr 7 09:02:46 2010
@@ -45,29 +45,36 @@ import static org.apache.camel.util.Obje
public class ProducerCache extends ServiceSupport {
private static final transient Log LOG =
LogFactory.getLog(ProducerCache.class);
- // TODO: Expose this cache for management in JMX (also ConsumerCache)
- // TODO: Add source information so we know who uses this cache
- // TODO: Add purge operation to purge the cache
-
private final CamelContext camelContext;
private final ServicePool<Endpoint, Producer> pool;
private final Map<String, Producer> producers;
+ private final Object source;
- public ProducerCache(CamelContext camelContext) {
- this(camelContext,
CamelContextHelper.getMaximumCachePoolSize(camelContext));
+ public ProducerCache(Object source, CamelContext camelContext) {
+ this(source, camelContext,
CamelContextHelper.getMaximumCachePoolSize(camelContext));
}
- public ProducerCache(CamelContext camelContext, int cacheSize) {
- this(camelContext, camelContext.getProducerServicePool(), new
LRUCache<String, Producer>(cacheSize));
+ public ProducerCache(Object source, CamelContext camelContext, int
cacheSize) {
+ this(source, camelContext, camelContext.getProducerServicePool(), new
LRUCache<String, Producer>(cacheSize));
}
- public ProducerCache(CamelContext camelContext, ServicePool<Endpoint,
Producer> producerServicePool, Map<String, Producer> cache) {
+ public ProducerCache(Object source, CamelContext camelContext,
ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer>
cache) {
+ this.source = source;
this.camelContext = camelContext;
this.pool = producerServicePool;
this.producers = cache;
}
/**
+ * Gets the source which uses this cache
+ *
+ * @return the source
+ */
+ public Object getSource() {
+ return source;
+ }
+
+ /**
* Acquires a pooled producer which you <b>must</b> release back again
after usage using the
* {...@link #releaseProducer(org.apache.camel.Endpoint,
org.apache.camel.Producer)} method.
*
@@ -275,10 +282,34 @@ public class ProducerCache extends Servi
*
* @return the current size
*/
- int size() {
+ public int size() {
int size = producers.size();
size += pool.size();
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("size = " + size);
+ }
return size;
}
+ /**
+ * Gets the maximum cache size (capacity).
+ * <p/>
+ * Will return -1 if it cannot determine this if a custom cache was used.
+ *
+ * @return the capacity
+ */
+ public int getCapacity() {
+ int capacity = -1;
+ if (producers instanceof LRUCache) {
+ LRUCache cache = (LRUCache) producers;
+ capacity = cache.getMaxCacheSize();
+ }
+ return capacity;
+ }
+
+ @Override
+ public String toString() {
+ return "ProducerCache for source: " + source;
+ }
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java?rev=931471&r1=931470&r2=931471&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
Wed Apr 7 09:02:46 2010
@@ -35,6 +35,7 @@ import org.apache.camel.Route;
import org.apache.camel.Service;
import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.impl.EventDrivenConsumerRoute;
+import org.apache.camel.impl.ProducerCache;
import org.apache.camel.impl.ScheduledPollConsumer;
import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
import org.apache.camel.management.mbean.ManagedBrowsableEndpoint;
@@ -47,6 +48,7 @@ import org.apache.camel.management.mbean
import org.apache.camel.management.mbean.ManagedPerformanceCounter;
import org.apache.camel.management.mbean.ManagedProcessor;
import org.apache.camel.management.mbean.ManagedProducer;
+import org.apache.camel.management.mbean.ManagedProducerCache;
import org.apache.camel.management.mbean.ManagedRoute;
import org.apache.camel.management.mbean.ManagedScheduledPollConsumer;
import org.apache.camel.management.mbean.ManagedSendProcessor;
@@ -304,6 +306,8 @@ public class DefaultManagementLifecycleS
return getManagedObjectForProcessor(context, (Processor) service,
route);
} else if (service instanceof ThrottlingInflightRoutePolicy) {
answer = new ManagedThrottlingInflightRoutePolicy(context,
(ThrottlingInflightRoutePolicy) service);
+ } else if (service instanceof ProducerCache) {
+ answer = new ManagedProducerCache(context, (ProducerCache)
service);
} else if (service != null) {
// fallback as generic service
answer = new ManagedService(context, service);
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java?rev=931471&view=auto
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java
(added)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java
Wed Apr 7 09:02:46 2010
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.ProducerCache;
+import org.springframework.jmx.export.annotation.ManagedAttribute;
+import org.springframework.jmx.export.annotation.ManagedResource;
+
+/**
+ * @version $Revision$
+ */
+...@managedresource(description = "Managed ProducerCache")
+public class ManagedProducerCache extends ManagedService {
+
+ private final ProducerCache producerCache;
+
+ public ManagedProducerCache(CamelContext context, ProducerCache
producerCache) {
+ super(context, producerCache);
+ this.producerCache = producerCache;
+ }
+
+ public ProducerCache getProducerCache() {
+ return producerCache;
+ }
+
+ @ManagedAttribute(description = "Source")
+ public String getSource() {
+ if (producerCache.getSource() != null) {
+ return producerCache.getSource().toString();
+ }
+ return null;
+ }
+
+ @ManagedAttribute(description = "Number of elements cached")
+ public Integer getSize() {
+ return producerCache.size();
+ }
+
+ @ManagedAttribute(description = "Maximum cache size (capacity)")
+ public Integer getMaximumCacheSize() {
+ return producerCache.getCapacity();
+ }
+
+}
Propchange:
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=931471&r1=931470&r2=931471&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
Wed Apr 7 09:02:46 2010
@@ -140,7 +140,7 @@ public class RecipientList extends Servi
protected void doStart() throws Exception {
if (producerCache == null) {
- producerCache = new ProducerCache(camelContext);
+ producerCache = new ProducerCache(this, camelContext);
// add it as a service so we can manage it
camelContext.addService(producerCache);
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java?rev=931471&r1=931470&r2=931471&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
Wed Apr 7 09:02:46 2010
@@ -140,7 +140,7 @@ public class RoutingSlip extends Service
protected void doStart() throws Exception {
if (producerCache == null) {
- producerCache = new ProducerCache(camelContext);
+ producerCache = new ProducerCache(this, camelContext);
// add it as a service so we can manage it
camelContext.addService(producerCache);
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?rev=931471&r1=931470&r2=931471&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
Wed Apr 7 09:02:46 2010
@@ -128,7 +128,7 @@ public class SendProcessor extends Servi
protected void doStart() throws Exception {
if (producerCache == null) {
- producerCache = new ProducerCache(camelContext);
+ producerCache = new ProducerCache(this, camelContext);
// add it as a service so we can manage it
camelContext.addService(producerCache);
}
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java?rev=931471&r1=931470&r2=931471&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
Wed Apr 7 09:02:46 2010
@@ -26,7 +26,7 @@ import org.apache.camel.Producer;
public class DefaultProducerCacheTest extends ContextTestSupport {
public void testCacheProducerAcquireAndRelease() throws Exception {
- ProducerCache cache = new ProducerCache(context);
+ ProducerCache cache = new ProducerCache(this, context);
cache.start();
assertEquals("Size should be 0", 0, cache.size());
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedProducerCacheTest.java
(from r931444,
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSendProcessorTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedProducerCacheTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedProducerCacheTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSendProcessorTest.java&r1=931444&r2=931471&rev=931471&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSendProcessorTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedProducerCacheTest.java
Wed Apr 7 09:02:46 2010
@@ -16,19 +16,20 @@
*/
package org.apache.camel.management;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
-import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
/**
* @version $Revision$
*/
-public class ManagedSendProcessorTest extends ContextTestSupport {
+public class ManagedProducerCacheTest extends ContextTestSupport {
@Override
protected boolean useJmx() {
@@ -45,11 +46,8 @@ public class ManagedSendProcessorTest ex
}
@SuppressWarnings("unchecked")
- public void testManageSendProcessor() throws Exception {
- MockEndpoint result = getMockEndpoint("mock:result");
- result.expectedMessageCount(1);
- MockEndpoint foo = getMockEndpoint("mock:foo");
- foo.expectedMessageCount(0);
+ public void testManageProducerCache() throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(1);
template.sendBody("direct:start", "Hello World");
@@ -57,39 +55,27 @@ public class ManagedSendProcessorTest ex
// get the stats for the route
MBeanServer mbeanServer =
context.getManagementStrategy().getManagementAgent().getMBeanServer();
+ Set<ObjectName> set = mbeanServer.queryNames(new
ObjectName("*:type=services,*"), null);
+ assertEquals(6, set.size());
+ List<ObjectName> list = new ArrayList<ObjectName>(set);
+ ObjectName on = null;
+ for (ObjectName name : list) {
+ if (name.getCanonicalName().contains("ProducerCache")) {
+ on = name;
+ break;
+ }
+ }
- // get the object name for the delayer
- ObjectName on =
ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mysend\"");
-
- // should be on route1
- String routeId = (String) mbeanServer.getAttribute(on, "RouteId");
- assertEquals("route1", routeId);
-
- String camelId = (String) mbeanServer.getAttribute(on, "CamelId");
- assertEquals("camel-1", camelId);
-
- String state = (String) mbeanServer.getAttribute(on, "State");
- assertEquals(ServiceStatus.Started.name(), state);
-
- String destination = (String) mbeanServer.getAttribute(on,
"Destination");
- assertEquals("mock://result", destination);
-
- String pattern = (String) mbeanServer.getAttribute(on,
"MessageExchangePattern");
- assertNull(pattern);
-
- // send it somewhere else
- mbeanServer.invoke(on, "changeDestination", new
Object[]{"direct:foo"}, new String[]{"java.lang.String"});
+ assertNotNull("Should have found ProducerCache", on);
- // prepare mocks
- result.reset();
- result.expectedMessageCount(0);
- foo.reset();
- foo.expectedMessageCount(1);
+ Integer max = (Integer) mbeanServer.getAttribute(on,
"MaximumCacheSize");
+ assertEquals(1000, max.intValue());
- // send in another message that should be sent to mock:foo
- template.sendBody("direct:start", "Bye World");
+ Integer current = (Integer) mbeanServer.getAttribute(on, "Size");
+ assertEquals(1, current.intValue());
- assertMockEndpointsSatisfied();
+ String source = (String) mbeanServer.getAttribute(on, "Source");
+ assertEquals("sendTo(Endpoint[mock://result])", source);
}
@Override
@@ -97,12 +83,9 @@ public class ManagedSendProcessorTest ex
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:start")
- .to("mock:result").id("mysend");
-
- from("direct:foo").to("mock:foo");
+ from("direct:start").to("mock:result");
}
};
}
-}
+}
\ No newline at end of file