Author: davsclaus
Date: Sun Dec 20 13:53:01 2009
New Revision: 892595
URL: http://svn.apache.org/viewvc?rev=892595&view=rev
Log:
CAMEL-1483: Graceful shutdown is now JMX managed, on CamelContext.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefuTimeoutTriggerTest.java
(contents, props changed)
- copied, changed from r892587,
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefulWithTimerTest.java
- copied, changed from r892587,
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedShutdownStrategyTest.java
- copied, changed from r892586,
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedErrorHandlerTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=892595&r1=892594&r2=892595&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Sun
Dec 20 13:53:01 2009
@@ -653,10 +653,10 @@
*
* @return the strategy
*/
- public ShutdownStrategy getShutdownStrategy();
+ ShutdownStrategy getShutdownStrategy();
/**
- * Sets a custom shtudown strategy
+ * Sets a custom shutdown strategy
*
* @param shutdownStrategy the custom strategy
*/
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=892595&r1=892594&r2=892595&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
Sun Dec 20 13:53:01 2009
@@ -26,11 +26,11 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.processor.MulticastProcessor;
import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
@@ -85,7 +85,7 @@
return false;
}
- public int getPendingExchanges() {
+ public int getPendingExchangesSize() {
// number of pending messages on the queue
return endpoint.getQueue().size();
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=892595&r1=892594&r2=892595&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
Sun Dec 20 13:53:01 2009
@@ -36,7 +36,7 @@
import org.apache.commons.logging.LogFactory;
/**
- * Default shutdowns strategy which supports graceful shutdown.
+ * Default {...@link org.apache.camel.spi.ShutdownStrategy} which uses
graceful shutdown.
* <p/>
* Graceful shutdown ensures that any inflight and pending messages will be
taken into account
* and it will wait until these exchanges has been completed.
@@ -101,44 +101,47 @@
LOG.info("Graceful shutdown of routes completed in " + seconds + "
seconds");
}
- /**
- * Set an timeout to wait for the shutdown to complete.
- * <p/>
- * Setting a value of 0 or negative will disable timeout and wait until
complete
- * (potential blocking forever)
- *
- * @param timeout timeout in millis
- */
public void setTimeout(long timeout) {
this.timeout = timeout;
}
- /**
- * Set the time unit to use
- *
- * @param timeUnit the unit to use
- */
+ public long getTimeout() {
+ return timeout;
+ }
+
public void setTimeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
}
- /**
- * Sets whether to force shutdown of all consumers when a timeout occurred
and thus
- * not all consumers was shutdown within that period.
- *
- * @param shutdownNowOnTimeout <tt>true</tt> to force shutdown,
<tt>false</tt> to leave them running
- */
+ public TimeUnit getTimeUnit() {
+ return timeUnit;
+ }
+
public void setShutdownNowOnTimeout(boolean shutdownNowOnTimeout) {
this.shutdownNowOnTimeout = shutdownNowOnTimeout;
}
+ public boolean isShutdownNowOnTimeout() {
+ return shutdownNowOnTimeout;
+ }
+
+ /**
+ * Shutdown all the consumers immediately.
+ *
+ * @param consumers the consumers to shutdown
+ */
protected void shutdownNow(List<Consumer> consumers) {
for (Consumer consumer : consumers) {
- shutdownConsumer(consumer);
+ shutdownNow(consumer);
}
}
- protected void shutdownConsumer(Consumer consumer) {
+ /**
+ * Shutdown the consumer immediately.
+ *
+ * @param consumer the consumer to shutdown
+ */
+ protected void shutdownNow(Consumer consumer) {
if (LOG.isTraceEnabled()) {
LOG.trace("Shutting down: " + consumer);
}
@@ -176,6 +179,9 @@
executor = null;
}
+ /**
+ * Shutdown task which shutdown all the routes in a graceful manner.
+ */
class ShutdownTask implements Runnable {
private final CamelContext context;
@@ -204,7 +210,7 @@
}
if (shutdown) {
- shutdownConsumer(consumer);
+ shutdownNow(consumer);
} else {
// we will stop it later, but for now it must run to be
able to help all inflight messages
// be safely completed
@@ -221,7 +227,7 @@
// include any additional pending exchanges on some
consumers which may have internal
// memory queues such as seda
if (consumer instanceof ShutdownAware) {
- size += ((ShutdownAware)
consumer).getPendingExchanges();
+ size += ((ShutdownAware)
consumer).getPendingExchangesSize();
}
}
if (size > 0) {
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java?rev=892595&r1=892594&r2=892595&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
Sun Dec 20 13:53:01 2009
@@ -17,6 +17,7 @@
package org.apache.camel.management.mbean;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
@@ -91,6 +92,36 @@
return context.getInflightRepository().size();
}
+ @ManagedAttribute(description = "Shutdown timeout")
+ public void setTimeout(long timeout) {
+ context.getShutdownStrategy().setTimeout(timeout);
+ }
+
+ @ManagedAttribute(description = "Shutdown timeout")
+ public long getTimeout() {
+ return context.getShutdownStrategy().getTimeout();
+ }
+
+ @ManagedAttribute(description = "Shutdown timeout time unit")
+ public void setTimeUnit(TimeUnit timeUnit) {
+ context.getShutdownStrategy().setTimeUnit(timeUnit);
+ }
+
+ @ManagedAttribute(description = "Shutdown timeout time unit")
+ public TimeUnit getTimeUnit() {
+ return context.getShutdownStrategy().getTimeUnit();
+ }
+
+ @ManagedAttribute(description = "Whether to force shutdown now when a
timeout occurred")
+ public void setShutdownNowOnTimeout(boolean shutdownNowOnTimeout) {
+
context.getShutdownStrategy().setShutdownNowOnTimeout(shutdownNowOnTimeout);
+ }
+
+ @ManagedAttribute(description = "Whether to force shutdown now when a
timeout occurred")
+ public boolean isShutdownNowOnTimeout() {
+ return context.getShutdownStrategy().isShutdownNowOnTimeout();
+ }
+
@ManagedOperation(description = "Start Camel")
public void start() throws Exception {
context.start();
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java?rev=892595&r1=892594&r2=892595&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownAware.java
Sun Dec 20 13:53:01 2009
@@ -39,11 +39,14 @@
boolean deferShutdown();
/**
- * Some consumers have internal queues with {...@link
org.apache.camel.Exchange} which are pending.
+ * Gets the number of pending exchanges.
+ * <p/>
+ * Some consumers has internal queues with {...@link
org.apache.camel.Exchange} which are pending.
+ * For example the {...@link org.apache.camel.component.seda.SedaConsumer}.
* <p/>
* Return <tt>zero</tt> to indicate no pending exchanges and therefore
ready to shutdown.
*
* @return number of pending exchanges
*/
- int getPendingExchanges();
+ int getPendingExchangesSize();
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java?rev=892595&r1=892594&r2=892595&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ShutdownStrategy.java
Sun Dec 20 13:53:01 2009
@@ -17,9 +17,11 @@
package org.apache.camel.spi;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
+import org.apache.camel.Service;
/**
* Pluggable shutdown strategy executed during shutdown of routes.
@@ -37,7 +39,7 @@
* @version $Revision$
* @see org.apache.camel.spi.ShutdownAware
*/
-public interface ShutdownStrategy {
+public interface ShutdownStrategy extends Service {
/**
* Shutdown the routes
@@ -48,4 +50,51 @@
*/
void shutdown(CamelContext context, List<Consumer> consumers) throws
Exception;
+ /**
+ * Set an timeout to wait for the shutdown to complete.
+ * <p/>
+ * Setting a value of 0 or negative will disable timeout and wait until
complete
+ * (potential blocking forever)
+ *
+ * @param timeout timeout in millis
+ */
+ void setTimeout(long timeout);
+
+ /**
+ * Gets the timeout.
+ * <p/>
+ * Use 0 or a negative value to disable timeout
+ *
+ * @return the timeout
+ */
+ long getTimeout();
+
+ /**
+ * Set the time unit to use
+ *
+ * @param timeUnit the unit to use
+ */
+ void setTimeUnit(TimeUnit timeUnit);
+
+ /**
+ * Gets the time unit used
+ *
+ * @return the time unit
+ */
+ TimeUnit getTimeUnit();
+
+ /**
+ * Sets whether to force shutdown of all consumers when a timeout occurred
and thus
+ * not all consumers was shutdown within that period.
+ *
+ * @param shutdownNowOnTimeout <tt>true</tt> to force shutdown,
<tt>false</tt> to leave them running
+ */
+ void setShutdownNowOnTimeout(boolean shutdownNowOnTimeout);
+
+ /**
+ * whether to force shutdown of all consumers when a timeout occurred.
+ *
+ * @return force shutdown or not
+ */
+ boolean isShutdownNowOnTimeout();
}
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefuTimeoutTriggerTest.java
(from r892587,
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefuTimeoutTriggerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefuTimeoutTriggerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java&r1=892587&r2=892595&rev=892595&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefuTimeoutTriggerTest.java
Sun Dec 20 13:53:01 2009
@@ -24,11 +24,14 @@
/**
* @version $Revision$
*/
-public class PendingExchangesShutdownGracefulTest extends ContextTestSupport {
+public class ShutdownGracefuTimeoutTriggerTest extends ContextTestSupport {
private static String foo = "";
public void testShutdownGraceful() throws Exception {
+ // timeout after 2 seconds
+ context.getShutdownStrategy().setTimeout(2);
+
getMockEndpoint("mock:foo").expectedMessageCount(1);
template.sendBody("seda:foo", "A");
@@ -43,8 +46,8 @@
foo = foo + "stop";
context.stop();
- // it should wait as there was 1 inflight exchange and 4 pending
messages left
- assertEquals("Should graceful shutdown", "stopABCDE", foo);
+ // should not be able to complete all messages as timeout occurred
+ assertNotSame("Should not able able to complete all pending messages",
"stopABCDE", foo);
}
@Override
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefuTimeoutTriggerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefuTimeoutTriggerTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefulWithTimerTest.java
(from r892587,
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefulWithTimerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefulWithTimerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java&r1=892587&r2=892595&rev=892595&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ShutdownGracefulWithTimerTest.java
Sun Dec 20 13:53:01 2009
@@ -24,12 +24,14 @@
/**
* @version $Revision$
*/
-public class PendingExchangesShutdownGracefulTest extends ContextTestSupport {
+public class ShutdownGracefulWithTimerTest extends ContextTestSupport {
private static String foo = "";
public void testShutdownGraceful() throws Exception {
getMockEndpoint("mock:foo").expectedMessageCount(1);
+ // should be stopped before it fires the first one
+ getMockEndpoint("mock:timer").expectedMessageCount(0);
template.sendBody("seda:foo", "A");
template.sendBody("seda:foo", "B");
@@ -52,6 +54,8 @@
return new RouteBuilder() {
@Override
public void configure() throws Exception {
+ from("timer:foo?period=500&delay=2000").to("mock:timer");
+
from("seda:foo").to("mock:foo").delay(1000).process(new
Processor() {
public void process(Exchange exchange) throws Exception {
foo = foo + exchange.getIn().getBody(String.class);
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedShutdownStrategyTest.java
(from r892586,
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedErrorHandlerTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedShutdownStrategyTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedShutdownStrategyTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedErrorHandlerTest.java&r1=892586&r2=892595&rev=892595&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedErrorHandlerTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedShutdownStrategyTest.java
Sun Dec 20 13:53:01 2009
@@ -16,8 +16,7 @@
*/
package org.apache.camel.management;
-import java.util.Iterator;
-import java.util.Set;
+import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -28,7 +27,7 @@
/**
* @version $Revision$
*/
-public class ManagedErrorHandlerTest extends ContextTestSupport {
+public class ManagedShutdownStrategyTest extends ContextTestSupport {
@Override
protected CamelContext createCamelContext() throws Exception {
@@ -39,22 +38,16 @@
return context;
}
- public void testManagedErrorHandler() throws Exception {
+ public void testManagedShutdownStrategy() throws Exception {
MBeanServer mbeanServer =
context.getManagementStrategy().getManagementAgent().getMBeanServer();
- Set<ObjectName> set = mbeanServer.queryNames(new
ObjectName("*:type=errorhandlers,*"), null);
- // there should only be 2 error handler types as route 1 and route 3
uses the same default error handler
- assertEquals(2, set.size());
-
- Iterator<ObjectName> it = set.iterator();
- ObjectName on1 = it.next();
- ObjectName on2 = it.next();
+ ObjectName on =
ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=context,name=\"camel-1\"");
- String name1 = on1.getCanonicalName();
- String name2 = on2.getCanonicalName();
+ Long timeout = (Long) mbeanServer.getAttribute(on, "Timeout");
+ assertEquals(300, timeout.longValue());
- assertTrue("Should be a default error handler",
name1.contains("CamelDefaultErrorHandlerBuilder") ||
name2.contains("CamelDefaultErrorHandlerBuilder"));
- assertTrue("Should be a dead letter error handler",
name1.contains("DeadLetterChannelBuilder") ||
name2.contains("DeadLetterChannelBuilder"));
+ TimeUnit unit = (TimeUnit) mbeanServer.getAttribute(on, "TimeUnit");
+ assertEquals("seconds", unit.toString().toLowerCase());
}
@Override
@@ -63,11 +56,7 @@
@Override
public void configure() throws Exception {
from("direct:foo").to("mock:foo");
-
-
from("direct:bar").errorHandler(deadLetterChannel("mock:dead")).to("mock:bar");
-
- from("direct:baz").to("mock:baz");
}
};
}
-}
+}
\ No newline at end of file