Author: davsclaus
Date: Fri Nov 4 10:16:12 2011
New Revision: 1197484
URL: http://svn.apache.org/viewvc?rev=1197484&view=rev
Log:
CAMEL-4614: Added purge method to JMX seda endpoint
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSedaEndpointTest.java
- copied, changed from r1197409,
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedBrowseableEndpointTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1197484&r1=1197483&r2=1197484&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
Fri Nov 4 10:16:12 2011
@@ -32,6 +32,9 @@ import org.apache.camel.MultipleConsumer
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.WaitForTaskToComplete;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.processor.MulticastProcessor;
import org.apache.camel.spi.BrowsableEndpoint;
@@ -42,6 +45,7 @@ import org.apache.camel.util.ServiceHelp
* href="http://camel.apache.org/queue.html">Queue components</a> for
* asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext
*/
+@ManagedResource(description = "Managed SedaEndpoint")
public class SedaEndpoint extends DefaultEndpoint implements
BrowsableEndpoint, MultipleConsumersSupport {
private volatile BlockingQueue<Exchange> queue;
private int size;
@@ -133,6 +137,7 @@ public class SedaEndpoint extends Defaul
this.size = queue.remainingCapacity();
}
+ @ManagedAttribute(description = "Queue max capacity")
public int getSize() {
return size;
}
@@ -141,10 +146,16 @@ public class SedaEndpoint extends Defaul
this.size = size;
}
+ @ManagedAttribute(description = "Current queue size")
+ public int getCurrentQueueSize() {
+ return queue.size();
+ }
+
public void setBlockWhenFull(boolean blockWhenFull) {
this.blockWhenFull = blockWhenFull;
}
+ @ManagedAttribute(description = "Whether the caller will block sending to
a full queue")
public boolean isBlockWhenFull() {
return blockWhenFull;
}
@@ -153,10 +164,12 @@ public class SedaEndpoint extends Defaul
this.concurrentConsumers = concurrentConsumers;
}
+ @ManagedAttribute(description = "Number of concurrent consumers")
public int getConcurrentConsumers() {
return concurrentConsumers;
}
+ @ManagedAttribute
public WaitForTaskToComplete getWaitForTaskToComplete() {
return waitForTaskToComplete;
}
@@ -165,6 +178,7 @@ public class SedaEndpoint extends Defaul
this.waitForTaskToComplete = waitForTaskToComplete;
}
+ @ManagedAttribute
public long getTimeout() {
return timeout;
}
@@ -173,6 +187,7 @@ public class SedaEndpoint extends Defaul
this.timeout = timeout;
}
+ @ManagedAttribute
public boolean isMultipleConsumers() {
return multipleConsumers;
}
@@ -192,11 +207,20 @@ public class SedaEndpoint extends Defaul
return new ArrayList<Exchange>(getQueue());
}
+ @ManagedAttribute
public boolean isMultipleConsumersSupported() {
return isMultipleConsumers();
}
/**
+ * Purges the queue
+ */
+ @ManagedOperation(description = "Purges the seda queue")
+ public void purgeQueue() {
+ queue.clear();
+ }
+
+ /**
* Returns the current active consumers on this endpoint
*/
public Set<SedaConsumer> getConsumers() {
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSedaEndpointTest.java
(from r1197409,
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedBrowseableEndpointTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSedaEndpointTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSedaEndpointTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedBrowseableEndpointTest.java&r1=1197409&r2=1197484&rev=1197484&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedBrowseableEndpointTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSedaEndpointTest.java
Fri Nov 4 10:16:12 2011
@@ -24,36 +24,41 @@ import org.apache.camel.builder.RouteBui
/**
* @version
*/
-public class ManagedBrowseableEndpointTest extends ManagementTestSupport {
+public class ManagedSedaEndpointTest extends ManagementTestSupport {
- public void testBrowseableEndpoint() throws Exception {
+ public void testSedaEndpoint() throws Exception {
getMockEndpoint("mock:result").expectedMessageCount(2);
- template.sendBody("direct:start", "Hello World");
- template.sendBody("direct:start", "Bye World");
+ template.sendBody("seda:start", "Hello World");
+ template.sendBody("seda:start", "Bye World");
assertMockEndpointsSatisfied();
MBeanServer mbeanServer = getMBeanServer();
- ObjectName name =
ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=endpoints,name=\"mock://result\"");
+ ObjectName name =
ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=endpoints,name=\"seda://start\"");
String uri = (String) mbeanServer.getAttribute(name, "EndpointUri");
- assertEquals("mock://result", uri);
+ assertEquals("seda://start", uri);
- Long size = (Long) mbeanServer.invoke(name, "queueSize", null, null);
- assertEquals(2, size.longValue());
+ Long timeout = (Long) mbeanServer.getAttribute(name, "Timeout");
+ assertEquals(30000, timeout.intValue());
- String out = (String) mbeanServer.invoke(name, "browseExchange", new
Object[]{0}, new String[]{"java.lang.Integer"});
- assertNotNull(out);
- assertTrue(out.contains("Hello World"));
-
- out = (String) mbeanServer.invoke(name, "browseExchange", new
Object[]{1}, new String[]{"java.lang.Integer"});
- assertNotNull(out);
- assertTrue(out.contains("Bye World"));
-
- out = (String) mbeanServer.invoke(name, "browseMessageBody", new
Object[]{1}, new String[]{"java.lang.Integer"});
- assertNotNull(out);
- assertEquals("Bye World", out);
+ Integer size = (Integer) mbeanServer.getAttribute(name,
"CurrentQueueSize");
+ assertEquals(0, size.intValue());
+
+ // stop route
+ context.stopRoute("foo");
+
+ // send a message to queue
+ template.sendBody("seda:start", "Hi World");
+
+ size = (Integer) mbeanServer.getAttribute(name, "CurrentQueueSize");
+ assertEquals(1, size.intValue());
+
+ mbeanServer.invoke(name, "purgeQueue", null, null);
+
+ size = (Integer) mbeanServer.getAttribute(name, "CurrentQueueSize");
+ assertEquals(0, size.intValue());
}
@Override
@@ -61,7 +66,7 @@ public class ManagedBrowseableEndpointTe
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:start").to("log:foo").to("mock:result");
+
from("seda:start").routeId("foo").to("log:foo").to("mock:result");
}
};
}