Author: gertv
Date: Thu Jul 21 17:23:59 2011
New Revision: 1149275
URL: http://svn.apache.org/viewvc?rev=1149275&view=rev
Log:
Use a global ProcessorFactory instead of a global InterceptStrategy
Added:
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala
- copied, changed from r1149199,
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/DelegateProcessorFactory.scala
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala
- copied, changed from r1149199,
servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbStrategyTest.scala
Removed:
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalInterceptStrategy.scala
servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbStrategyTest.scala
Modified:
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala
Copied:
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala
(from r1149199,
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala)
URL:
http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala?p2=servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala&p1=servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala&r1=1149199&r2=1149275&rev=1149275&view=diff
==============================================================================
---
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala
(original)
+++
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala
Thu Jul 21 17:23:59 2011
@@ -16,30 +16,28 @@
*/
package org.apache.servicemix.core
-import org.apache.camel.spi.InterceptStrategy
-import org.apache.camel.model.ProcessorDefinition
-import org.apache.camel.{Exchange, Processor, CamelContext}
+import org.apache.camel.processor.DelegateAsyncProcessor
+import org.apache.camel.{AsyncCallback, Exchange, Processor, CamelContext}
/**
* The ServiceMix bread crumb strategy adds a header to the message to ensure
we can follow the message throughout
* different routes and processors.
*/
-class BreadcrumbStrategy extends InterceptStrategy {
+class Breadcrumbs extends DelegateProcessorFactory {
- import BreadcrumbStrategy.{hasBreadCrumb, addBreadCrumb}
+ import Breadcrumbs.{hasBreadCrumb, addBreadCrumb}
- def wrapProcessorInInterceptors(context: CamelContext, definition:
ProcessorDefinition[_], target: Processor, nextTarget: Processor) : Processor =
- new Processor() {
- def process(exchange: Exchange) {
- if (!hasBreadCrumb(exchange)) {
- addBreadCrumb(exchange)
- }
- target.process(exchange)
+ def create(delegate: Processor) = new DelegateAsyncProcessor(delegate) {
+ override def process(exchange: Exchange, callback: AsyncCallback) = {
+ if (!hasBreadCrumb(exchange)) {
+ addBreadCrumb(exchange)
}
+ processNext(exchange, callback)
}
+ }
}
-object BreadcrumbStrategy {
+object Breadcrumbs {
/**
* ServiceMix bread crumb header name
@@ -62,4 +60,26 @@ object BreadcrumbStrategy {
def addBreadCrumb(exchange: Exchange) : Unit =
exchange.getIn.setHeader(SERVICEMIX_BREAD_CRUMB,
exchange.getContext.getUuidGenerator.generateUuid())
+ /**
+ * Enable bread crumbs on the target CamelContext
+ */
+ def enable(context: CamelContext) = {
+ context.getProcessorFactory match {
+ case global: GlobalProcessorFactory => global.addFactory(new Breadcrumbs)
+ case _ => //unable to enable bread crumbs
+ }
+ }
+
+ /**
+ * Disable bread crumbs on the target CamelContext
+ */
+ def disable(context: CamelContext) = {
+ context.getProcessorFactory match {
+ case global: GlobalProcessorFactory => for (breadcrumb <-
global.factories.filter(_.isInstanceOf[Breadcrumbs])) {
+ global.removeFactory(breadcrumb)
+ }
+ case _ => //unable to enable bread crumbs
+ }
+ }
+
}
Added:
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/DelegateProcessorFactory.scala
URL:
http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/DelegateProcessorFactory.scala?rev=1149275&view=auto
==============================================================================
---
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/DelegateProcessorFactory.scala
(added)
+++
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/DelegateProcessorFactory.scala
Thu Jul 21 17:23:59 2011
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.core
+
+import org.apache.camel.{AsyncProcessor, Processor}
+
+/**
+ * Trait to allow creating delegate processor on demand
+ */
+trait DelegateProcessorFactory {
+
+ /**
+ * Create a new AsyncProcessor instance that can delegate part of its work
to the Processor instance provided
+ */
+ def create(delegate: Processor) : AsyncProcessor
+
+}
\ No newline at end of file
Added:
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
URL:
http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala?rev=1149275&view=auto
==============================================================================
---
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
(added)
+++
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
Thu Jul 21 17:23:59 2011
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.core
+
+import org.apache.camel.spi.{RouteContext, ProcessorFactory}
+import org.apache.camel.model.ProcessorDefinition
+import collection.mutable.ListBuffer
+import org.apache.camel.processor.DelegateAsyncProcessor
+import org.apache.camel.{AsyncProcessor, Processor, AsyncCallback, Exchange}
+import java.util.concurrent.atomic.AtomicInteger
+
+/**
+ * Global ServiceMix ProcessorFactory implementation, which will take care of
wrapping processors with the additional
+ * functionality provided by the {@link DelegateProcessorFactory} instances
+ */
+class GlobalProcessorFactory extends ProcessorFactory {
+
+ val factories = new ListBuffer[DelegateProcessorFactory]
+ val version = new AtomicInteger(1)
+
+ def addFactory(factory: DelegateProcessorFactory) = triggerUpdate(factories
+= factory);
+ def removeFactory(factory: DelegateProcessorFactory) =
triggerUpdate(factories -= factory);
+
+ def createChildProcessor(context: RouteContext, definition:
ProcessorDefinition[_], mandatory: Boolean) = {
+ nullOrElse(definition.createProcessor(context))(new
GlobalDelegateProcessor(context, definition, _))
+ }
+
+ def createProcessor(context: RouteContext, definition:
ProcessorDefinition[_]) = {
+ nullOrElse(definition.createProcessor(context))(new
GlobalDelegateProcessor(context, definition, _))
+ }
+
+ def nullOrElse[S,T](value: S)(function: S => T) = if (value == null) {
+ null.asInstanceOf[T]
+ } else {
+ function(value)
+ }
+
+ def triggerUpdate(block: => Unit) = {
+ block
+ version.incrementAndGet()
+ }
+
+ def configure(original: AsyncProcessor) : AsyncProcessor = {
+ factories.foldLeft(original){ (delegate: AsyncProcessor, factory:
DelegateProcessorFactory) =>
+ factory.create(delegate)
+ }
+ }
+
+ class GlobalDelegateProcessor(context: RouteContext, definition:
ProcessorDefinition[_], target: Processor) extends
DelegateAsyncProcessor(target) {
+
+ var currentProcessor =
GlobalProcessorFactory.this.configure(getProcessor())
+ var version = GlobalProcessorFactory.this.version.get()
+
+ override def process(exchange: Exchange, callback: AsyncCallback) = {
+ // let's check if processor factories have changed and reconfigure
things if necessary
+ if (version < GlobalProcessorFactory.this.version.get) {
+ currentProcessor =
GlobalProcessorFactory.this.configure(getProcessor())
+ }
+
+ currentProcessor.process(exchange, callback)
+ }
+
+ override def toString = "ServiceMix Wrapper[" + processor + "]"
+ }
+}
\ No newline at end of file
Modified:
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala
URL:
http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala?rev=1149275&r1=1149274&r2=1149275&view=diff
==============================================================================
---
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala
(original)
+++
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala
Thu Jul 21 17:23:59 2011
@@ -17,7 +17,7 @@
package org.apache.servicemix.core
import org.apache.camel._
-import model.{RouteDefinition, ChoiceDefinition, WhenDefinition,
ProcessorDefinition}
+import model.{RouteDefinition, ProcessorDefinition}
import processor.DelegateAsyncProcessor
import spi.{RouteContext, ProcessorFactory}
import collection.mutable.LinkedHashMap
Copied:
servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala
(from r1149199,
servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbStrategyTest.scala)
URL:
http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala?p2=servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala&p1=servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbStrategyTest.scala&r1=1149199&r2=1149275&rev=1149275&view=diff
==============================================================================
---
servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbStrategyTest.scala
(original)
+++
servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala
Thu Jul 21 17:23:59 2011
@@ -26,20 +26,18 @@ import org.apache.camel.scala.dsl.builde
import scala.collection.JavaConversions.asScalaBuffer
import org.apache.camel.impl.{DefaultCamelContext, DefaultProducerTemplate}
-import org.apache.servicemix.core.BreadcrumbStrategy.{hasBreadCrumb,
getBreadCrumb}
+import org.apache.servicemix.core.Breadcrumbs.{hasBreadCrumb, getBreadCrumb}
import org.scalatest.Assertions._
import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite}
@RunWith(classOf[JUnitRunner])
-class BreadcrumbStrategyTest extends FunSuite with RouteBuilderSupport with
BeforeAndAfterAll with BeforeAndAfterEach {
+class BreadcrumbsTest extends FunSuite with RouteBuilderSupport with
BeforeAndAfterAll with BeforeAndAfterEach {
val messages = List("<gingerbread/>", "<cakes/>", "<sugar/>")
- val globals = new GlobalInterceptStrategy
-
lazy val context = {
val result = new DefaultCamelContext()
- result.addInterceptStrategy(globals)
+ result.setProcessorFactory(new GlobalProcessorFactory)
result.addRoutes(createRouteBuilder())
result.start()
result
@@ -57,7 +55,7 @@ class BreadcrumbStrategyTest extends Fun
}
test("add breadcrumbs to message headers") {
- globals.addStrategy(new BreadcrumbStrategy)
+ Breadcrumbs.enable(context)
for (body <- messages) {
template.sendBody("direct:test", body)
@@ -79,8 +77,7 @@ class BreadcrumbStrategyTest extends Fun
}
test("bread crumb strategy can be disabled if necessary") {
- val breadcrumbs = new BreadcrumbStrategy
- globals.addStrategy(breadcrumbs)
+ Breadcrumbs.enable(context)
for (body <- messages) {
template.sendBody("direct:test", body)
@@ -101,7 +98,7 @@ class BreadcrumbStrategyTest extends Fun
assert(hansels == gretels, "Gretel should be able to find all of Hansel's
bread crumbs")
// let's now disable the bread crumbs and just continue with same
context/processors/...
- globals.removeStrategy(breadcrumbs)
+ Breadcrumbs.disable(context)
MockEndpoint.resetMocks(context)
for (body <- messages) {
@@ -123,7 +120,7 @@ class BreadcrumbStrategyTest extends Fun
override protected def afterEach() = {
MockEndpoint.resetMocks(context)
- globals.strategies.clear()
+
context.getProcessorFactory.asInstanceOf[GlobalProcessorFactory].factories.clear
}
def getMockEndpoint(name: String) = context.getEndpoint(name,
classOf[MockEndpoint])