Author: davsclaus
Date: Fri Nov  4 10:16:12 2011
New Revision: 1197484

URL: http://svn.apache.org/viewvc?rev=1197484&view=rev
Log:
CAMEL-4614: Added purge method to JMX seda endpoint

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSedaEndpointTest.java
      - copied, changed from r1197409, 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedBrowseableEndpointTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1197484&r1=1197483&r2=1197484&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
 Fri Nov  4 10:16:12 2011
@@ -32,6 +32,9 @@ import org.apache.camel.MultipleConsumer
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.WaitForTaskToComplete;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.spi.BrowsableEndpoint;
@@ -42,6 +45,7 @@ import org.apache.camel.util.ServiceHelp
  * href="http://camel.apache.org/queue.html";>Queue components</a> for
  * asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext
  */
+@ManagedResource(description = "Managed SedaEndpoint")
 public class SedaEndpoint extends DefaultEndpoint implements 
BrowsableEndpoint, MultipleConsumersSupport {
     private volatile BlockingQueue<Exchange> queue;
     private int size;
@@ -133,6 +137,7 @@ public class SedaEndpoint extends Defaul
         this.size = queue.remainingCapacity();
     }
 
+    @ManagedAttribute(description = "Queue max capacity")
     public int getSize() {
         return size;
     }
@@ -141,10 +146,16 @@ public class SedaEndpoint extends Defaul
         this.size = size;
     }
 
+    @ManagedAttribute(description = "Current queue size")
+    public int getCurrentQueueSize() {
+        return queue.size();
+    }
+
     public void setBlockWhenFull(boolean blockWhenFull) {
         this.blockWhenFull = blockWhenFull;
     }
 
+    @ManagedAttribute(description = "Whether the caller will block sending to 
a full queue")
     public boolean isBlockWhenFull() {
         return blockWhenFull;
     }
@@ -153,10 +164,12 @@ public class SedaEndpoint extends Defaul
         this.concurrentConsumers = concurrentConsumers;
     }
 
+    @ManagedAttribute(description = "Number of concurrent consumers")
     public int getConcurrentConsumers() {
         return concurrentConsumers;
     }
 
+    @ManagedAttribute
     public WaitForTaskToComplete getWaitForTaskToComplete() {
         return waitForTaskToComplete;
     }
@@ -165,6 +178,7 @@ public class SedaEndpoint extends Defaul
         this.waitForTaskToComplete = waitForTaskToComplete;
     }
 
+    @ManagedAttribute
     public long getTimeout() {
         return timeout;
     }
@@ -173,6 +187,7 @@ public class SedaEndpoint extends Defaul
         this.timeout = timeout;
     }
 
+    @ManagedAttribute
     public boolean isMultipleConsumers() {
         return multipleConsumers;
     }
@@ -192,11 +207,20 @@ public class SedaEndpoint extends Defaul
         return new ArrayList<Exchange>(getQueue());
     }
 
+    @ManagedAttribute
     public boolean isMultipleConsumersSupported() {
         return isMultipleConsumers();
     }
 
     /**
+     * Purges the queue
+     */
+    @ManagedOperation(description = "Purges the seda queue")
+    public void purgeQueue() {
+        queue.clear();
+    }
+
+    /**
      * Returns the current active consumers on this endpoint
      */
     public Set<SedaConsumer> getConsumers() {

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSedaEndpointTest.java
 (from r1197409, 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedBrowseableEndpointTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSedaEndpointTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSedaEndpointTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedBrowseableEndpointTest.java&r1=1197409&r2=1197484&rev=1197484&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedBrowseableEndpointTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSedaEndpointTest.java
 Fri Nov  4 10:16:12 2011
@@ -24,36 +24,41 @@ import org.apache.camel.builder.RouteBui
 /**
  * @version 
  */
-public class ManagedBrowseableEndpointTest extends ManagementTestSupport {
+public class ManagedSedaEndpointTest extends ManagementTestSupport {
 
-    public void testBrowseableEndpoint() throws Exception {
+    public void testSedaEndpoint() throws Exception {
         getMockEndpoint("mock:result").expectedMessageCount(2);
 
-        template.sendBody("direct:start", "Hello World");
-        template.sendBody("direct:start", "Bye World");
+        template.sendBody("seda:start", "Hello World");
+        template.sendBody("seda:start", "Bye World");
 
         assertMockEndpointsSatisfied();
 
         MBeanServer mbeanServer = getMBeanServer();
 
-        ObjectName name = 
ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=endpoints,name=\"mock://result\"");
+        ObjectName name = 
ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=endpoints,name=\"seda://start\"");
         String uri = (String) mbeanServer.getAttribute(name, "EndpointUri");
-        assertEquals("mock://result", uri);
+        assertEquals("seda://start", uri);
 
-        Long size = (Long) mbeanServer.invoke(name, "queueSize", null, null);
-        assertEquals(2, size.longValue());
+        Long timeout = (Long) mbeanServer.getAttribute(name, "Timeout");
+        assertEquals(30000, timeout.intValue());
 
-        String out = (String) mbeanServer.invoke(name, "browseExchange", new 
Object[]{0}, new String[]{"java.lang.Integer"});
-        assertNotNull(out);
-        assertTrue(out.contains("Hello World"));
-
-        out = (String) mbeanServer.invoke(name, "browseExchange", new 
Object[]{1}, new String[]{"java.lang.Integer"});
-        assertNotNull(out);
-        assertTrue(out.contains("Bye World"));
-
-        out = (String) mbeanServer.invoke(name, "browseMessageBody", new 
Object[]{1}, new String[]{"java.lang.Integer"});
-        assertNotNull(out);
-        assertEquals("Bye World", out);
+        Integer size = (Integer) mbeanServer.getAttribute(name, 
"CurrentQueueSize");
+        assertEquals(0, size.intValue());
+
+        // stop route
+        context.stopRoute("foo");
+
+        // send a message to queue
+        template.sendBody("seda:start", "Hi World");
+
+        size = (Integer) mbeanServer.getAttribute(name, "CurrentQueueSize");
+        assertEquals(1, size.intValue());
+
+        mbeanServer.invoke(name, "purgeQueue", null, null);
+
+        size = (Integer) mbeanServer.getAttribute(name, "CurrentQueueSize");
+        assertEquals(0, size.intValue());
     }
 
     @Override
@@ -61,7 +66,7 @@ public class ManagedBrowseableEndpointTe
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:start").to("log:foo").to("mock:result");
+                
from("seda:start").routeId("foo").to("log:foo").to("mock:result");
             }
         };
     }


Reply via email to