Author: davsclaus
Date: Mon Nov 2 08:35:11 2009
New Revision: 831818
URL: http://svn.apache.org/viewvc?rev=831818&view=rev
Log:
CAMEL-1048: Made SuspendableServie JMX managementable.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java
(with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedService.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java?rev=831818&r1=831817&r2=831818&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java
Mon Nov 2 08:35:11 2009
@@ -19,7 +19,6 @@
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Route;
-import org.apache.camel.SuspendableService;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.util.ServiceHelper;
@@ -43,67 +42,19 @@
}
protected boolean startConsumer(Consumer consumer) throws Exception {
- if (consumer instanceof SuspendableService) {
- SuspendableService ss = (SuspendableService) consumer;
- if (ss.isSuspended()) {
- if (log.isDebugEnabled()) {
- log.debug("Resuming consumer " + consumer);
- }
- ss.resume();
- return true;
- } else {
- return false;
- }
- } else if (consumer instanceof ServiceSupport) {
- ServiceSupport ss = (ServiceSupport) consumer;
- if (ss.getStatus().isStartable()) {
- if (log.isDebugEnabled()) {
- log.debug("Stopping consumer " + consumer);
- }
- consumer.start();
- return true;
- } else {
- return false;
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Stopping consumer " + consumer);
- }
- ServiceHelper.startService(consumer);
- return true;
+ boolean resumed = ServiceHelper.resumeService(consumer);
+ if (resumed && log.isDebugEnabled()) {
+ log.debug("Resuming consumer " + consumer);
}
+ return resumed;
}
protected boolean stopConsumer(Consumer consumer) throws Exception {
- if (consumer instanceof SuspendableService) {
- SuspendableService ss = (SuspendableService) consumer;
- if (!ss.isSuspended()) {
- ss.suspend();
- if (log.isDebugEnabled()) {
- log.debug("Suspending consumer " + consumer);
- }
- return true;
- } else {
- return false;
- }
- } else if (consumer instanceof ServiceSupport) {
- ServiceSupport ss = (ServiceSupport) consumer;
- if (ss.getStatus().isStoppable()) {
- if (log.isDebugEnabled()) {
- log.debug("Stopping consumer " + consumer);
- }
- consumer.stop();
- return true;
- } else {
- return false;
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Stopping consumer " + consumer);
- }
- ServiceHelper.stopService(consumer);
- return true;
+ boolean suspended = ServiceHelper.suspendService(consumer);
+ if (suspended && log.isDebugEnabled()) {
+ log.debug("Suspended consumer " + consumer);
}
+ return suspended;
}
/**
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java?rev=831818&r1=831817&r2=831818&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
Mon Nov 2 08:35:11 2009
@@ -32,6 +32,9 @@
* @version $Revision$
*/
public abstract class ServiceSupport implements Service {
+
+ // TODO: refactor and move me to org.apache.camel.util package
+
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean starting = new AtomicBoolean(false);
private final AtomicBoolean stopping = new AtomicBoolean(false);
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedService.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedService.java?rev=831818&r1=831817&r2=831818&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedService.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedService.java
Mon Nov 2 08:35:11 2009
@@ -20,6 +20,7 @@
import org.apache.camel.Route;
import org.apache.camel.Service;
import org.apache.camel.ServiceStatus;
+import org.apache.camel.SuspendableService;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.spi.ManagementStrategy;
import org.springframework.jmx.export.annotation.ManagedAttribute;
@@ -96,4 +97,40 @@
public void stop() throws Exception {
service.stop();
}
+
+ @ManagedAttribute(description = "Whether this service supports suspension")
+ public boolean isSupportSuspension() {
+ return service instanceof SuspendableService;
+ }
+
+ @ManagedAttribute(description = "Whether this service is suspended")
+ public boolean isSuspended() {
+ if (service instanceof SuspendableService) {
+ SuspendableService ss = (SuspendableService) service;
+ return ss.isSuspended();
+ } else {
+ return false;
+ }
+ }
+
+ @ManagedOperation(description = "Suspend Service")
+ public void suspend() throws Exception {
+ if (service instanceof SuspendableService) {
+ SuspendableService ss = (SuspendableService) service;
+ ss.suspend();
+ } else {
+ throw new UnsupportedOperationException("suspend() is not a
supported operation");
+ }
+ }
+
+ @ManagedOperation(description = "Resume Service")
+ public void resume() throws Exception {
+ if (service instanceof SuspendableService) {
+ SuspendableService ss = (SuspendableService) service;
+ ss.resume();
+ } else {
+ throw new UnsupportedOperationException("resume() is not a
supported operation");
+ }
+ }
+
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java?rev=831818&r1=831817&r2=831818&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
Mon Nov 2 08:35:11 2009
@@ -21,6 +21,8 @@
import java.util.List;
import org.apache.camel.Service;
+import org.apache.camel.SuspendableService;
+import org.apache.camel.impl.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -85,6 +87,9 @@
public static void stopService(Object value) throws Exception {
if (value instanceof Service) {
Service service = (Service)value;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Stopping service " + value);
+ }
service.stop();
} else if (value instanceof Collection) {
stopServices((Collection)value);
@@ -116,4 +121,89 @@
throw firstException;
}
}
+
+ /**
+ * Resumes the given service.
+ * <p/>
+ * If the service is a {...@link org.apache.camel.SuspendableService} then
the <tt>resume</tt>
+ * operation is <b>only</b> invoked if the service is suspended.
+ * <p/>
+ * If the service is a {...@link org.apache.camel.impl.ServiceSupport}
then the <tt>start</tt>
+ * operation is <b>only</b> invoked if the service is startable.
+ * <p/>
+ * Otherwise the service is started.
+ *
+ * @param service the service
+ * @return <tt>true</tt> if either <tt>resume</tt> or <tt>start</tt> was
invoked,
+ * <tt>false</tt> if the service is already in the desired state.
+ * @throws Exception is thrown if error occurred
+ */
+ public static boolean resumeService(Service service) throws Exception {
+ if (service instanceof SuspendableService) {
+ SuspendableService ss = (SuspendableService) service;
+ if (ss.isSuspended()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Resuming service " + service);
+ }
+ ss.resume();
+ return true;
+ } else {
+ return false;
+ }
+ } else if (service instanceof ServiceSupport) {
+ ServiceSupport ss = (ServiceSupport) service;
+ if (ss.getStatus().isStartable()) {
+ startService(service);
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ startService(service);
+ return true;
+ }
+ }
+
+ /**
+ * Suspends the given service.
+ * <p/>
+ * If the service is a {...@link org.apache.camel.SuspendableService} then
the <tt>suspend</tt>
+ * operation is <b>only</b> invoked if the service is <b>not</b> suspended.
+ * <p/>
+ * If the service is a {...@link org.apache.camel.impl.ServiceSupport}
then the <tt>stop</tt>
+ * operation is <b>only</b> invoked if the service is stoptable.
+ * <p/>
+ * Otherwise the service is stopped.
+ *
+ * @param service the service
+ * @return <tt>true</tt> if either <tt>suspend</tt> or <tt>stop</tt> was
invoked,
+ * <tt>false</tt> if the service is already in the desired state.
+ * @throws Exception is thrown if error occurred
+ */
+ public static boolean suspendService(Service service) throws Exception {
+ if (service instanceof SuspendableService) {
+ SuspendableService ss = (SuspendableService) service;
+ if (!ss.isSuspended()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Suspending service " + service);
+ }
+ ss.suspend();
+ return true;
+ } else {
+ return false;
+ }
+ } else if (service instanceof ServiceSupport) {
+ ServiceSupport ss = (ServiceSupport) service;
+ if (ss.getStatus().isStoppable()) {
+ stopServices(service);
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ stopService(service);
+ return true;
+ }
+ }
+
}
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java?rev=831818&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java
Mon Nov 2 08:35:11 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.io.File;
+import java.util.Set;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.RoutePolicySupport;
+
+/**
+ * @version $Revision$
+ */
+public class ManagedSuspendedServiceTest extends ContextTestSupport {
+
+ public void testConsumeSuspendAndResumeFile() throws Exception {
+ deleteDirectory("target/suspended");
+
+ MBeanServer mbeanServer =
context.getManagementStrategy().getManagementAgent().getMBeanServer();
+
+ Set<ObjectName> set = mbeanServer.queryNames(new
ObjectName("*:type=consumers,*"), null);
+ assertEquals(1, set.size());
+
+ ObjectName on = set.iterator().next();
+
+ boolean registered = mbeanServer.isRegistered(on);
+ assertEquals("Should be registered", true, registered);
+ Boolean ss = (Boolean) mbeanServer.getAttribute(on,
"SupportSuspension");
+ assertEquals(true, ss.booleanValue());
+ Boolean suspended = (Boolean) mbeanServer.getAttribute(on,
"Suspended");
+ assertEquals(false, suspended.booleanValue());
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Bye World");
+
+ template.sendBodyAndHeader("file://target/suspended", "Bye World",
Exchange.FILE_NAME, "bye.txt");
+ template.sendBodyAndHeader("file://target/suspended", "Hello World",
Exchange.FILE_NAME, "hello.txt");
+
+ assertMockEndpointsSatisfied();
+
+ Thread.sleep(1000);
+
+ // now its suspended by the policy
+ suspended = (Boolean) mbeanServer.getAttribute(on, "Suspended");
+ assertEquals(true, suspended.booleanValue());
+
+ // the route is suspended by the policy so we should only receive one
+ File file = new File("target/suspended/hello.txt").getAbsoluteFile();
+ assertEquals("The file should exists", true, file.exists());
+
+ // reset mock
+ mock.reset();
+ mock.expectedBodiesReceived("Hello World");
+
+ // now resume it
+ mbeanServer.invoke(on, "resume", null, null);
+
+ assertMockEndpointsSatisfied();
+
+ suspended = (Boolean) mbeanServer.getAttribute(on, "Suspended");
+ assertEquals(false, suspended.booleanValue());
+
+ Thread.sleep(500);
+
+ // and the file is now moved
+ assertEquals("The file should not exists", false, file.exists());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ MyPolicy myPolicy = new MyPolicy();
+
+
from("file://target/suspended?maxMessagesPerPoll=1&sortBy=file:name")
+ .routePolicy(myPolicy).id("myRoute")
+ .to("mock:result");
+ }
+ };
+ }
+
+ private class MyPolicy extends RoutePolicySupport {
+
+ public void onExchangeDone(Route route, Exchange exchange) {
+ try {
+ super.stopConsumer(route.getConsumer());
+ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+
+ }
+
+}
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date