Author: gnodet
Date: Fri Jul 22 15:48:43 2011
New Revision: 1149640
URL: http://svn.apache.org/viewvc?rev=1149640&view=rev
Log:
Support multiple bread crumbs and wrap the AggregationStrategy accordingly
Added:
servicemix/smx5/trunk/core/src/test/resources/
servicemix/smx5/trunk/core/src/test/resources/log4j.properties
Modified:
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala
Modified:
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala
URL:
http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala?rev=1149640&r1=1149639&r2=1149640&view=diff
==============================================================================
---
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala
(original)
+++
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala
Fri Jul 22 15:48:43 2011
@@ -16,8 +16,11 @@
*/
package org.apache.servicemix.core
-import org.apache.camel.processor.DelegateAsyncProcessor
import org.apache.camel.{AsyncCallback, Exchange, Processor, CamelContext}
+import org.apache.camel.processor.{DelegateProcessor, DelegateAsyncProcessor}
+import org.apache.camel.processor.aggregate.{AggregationStrategy,
AggregateProcessor}
+import collection.mutable.HashSet
+import collection.Iterable
/**
* The ServiceMix bread crumb strategy adds a header to the message to ensure
we can follow the message throughout
@@ -25,9 +28,9 @@ import org.apache.camel.{AsyncCallback,
*/
class Breadcrumbs extends DelegateProcessorFactory {
- import Breadcrumbs.{hasBreadCrumb, addBreadCrumb}
+ import Breadcrumbs.{hasBreadCrumb, addBreadCrumb, getBreadCrumb}
- def create(delegate: Processor) = new DelegateAsyncProcessor(delegate) {
+ def create(delegate: Processor) = new
DelegateAsyncProcessor(process(delegate)) {
override def process(exchange: Exchange, callback: AsyncCallback) = {
if (!hasBreadCrumb(exchange)) {
addBreadCrumb(exchange)
@@ -35,6 +38,29 @@ class Breadcrumbs extends DelegateProces
processNext(exchange, callback)
}
}
+
+ private def process(delegate: Processor) : Processor = {
+ var p = delegate
+ if (p.isInstanceOf[DelegateProcessor]) {
+ p = p.asInstanceOf[DelegateProcessor].getProcessor
+ }
+ if (p.isInstanceOf[AggregateProcessor]) {
+ val agg = p.asInstanceOf[AggregateProcessor]
+ val oldstrat = agg.getAggregationStrategy
+ val strategy = new AggregationStrategy {
+ def aggregate(oldExchange: Exchange, newExchange: Exchange) : Exchange
= {
+ val ex = oldstrat.aggregate(oldExchange, newExchange)
+ if (oldExchange == null)
+ addBreadCrumb(ex, List(getBreadCrumb(newExchange)))
+ else
+ addBreadCrumb(ex, List(getBreadCrumb(oldExchange),
getBreadCrumb(newExchange)))
+ ex
+ }
+ }
+ agg.setAggregationStrategy(strategy)
+ }
+ delegate
+ }
}
object Breadcrumbs {
@@ -50,15 +76,43 @@ object Breadcrumbs {
def hasBreadCrumb(exchange: Exchange) : Boolean = getBreadCrumb(exchange) !=
null
/**
- * Get the ServiceMix bread crumb value for an Exchange
+ * Get the ServiceMix bread crumb value for an Exchange (eventually a comma
separated list)
*/
def getBreadCrumb(exchange: Exchange) : String =
exchange.getIn.getHeader(SERVICEMIX_BREAD_CRUMB, classOf[String])
/**
+ * Get the ServiceMix bread crumb values for an Exchange
+ */
+ def getBreadCrumbs(exchange: Exchange) : Set[String] =
getBreadCrumbs(getBreadCrumb(exchange))
+
+ def getBreadCrumbs(breadcrumbs: String) : Set[String] = if (breadcrumbs ==
null) Set[String]() else breadcrumbs.split(",").toSet
+
+ /**
* Add a ServiceMix bread crumb to an Exchange
*/
- def addBreadCrumb(exchange: Exchange) : Unit =
exchange.getIn.setHeader(SERVICEMIX_BREAD_CRUMB,
-
exchange.getContext.getUuidGenerator.generateUuid())
+ def addBreadCrumb(exchange: Exchange) : Unit = setBreadCrumb(exchange,
exchange.getContext.getUuidGenerator.generateUuid())
+
+ /**
+ * Add a number of ServiceMix bread crumbs to an Exchange
+ */
+ def addBreadCrumb(exchange: Exchange, breadcrumbs: Iterable[String]) : Unit
= {
+ var bcs = new HashSet[String]()
+ bcs = bcs ++ getBreadCrumbs(exchange)
+ for (bc <- breadcrumbs) {
+ bcs = bcs ++ getBreadCrumbs(bc)
+ }
+ setBreadCrumb(exchange, bcs)
+ }
+
+ /**
+ * Set the ServiceMix bread crumb to an Exchange
+ */
+ def setBreadCrumb(exchange: Exchange, breadcrumb: String) : Unit =
exchange.getIn.setHeader(SERVICEMIX_BREAD_CRUMB, breadcrumb)
+
+ /**
+ * Set the ServiceMix bread crumbs to an Exchange
+ */
+ def setBreadCrumb(exchange: Exchange, breadcrumbs: Iterable[String]) : Unit
= setBreadCrumb(exchange, breadcrumbs.mkString(","))
/**
* Enable bread crumbs on the target CamelContext
@@ -82,4 +136,15 @@ object Breadcrumbs {
}
}
+ private def nullOrElse[S,T](value: S)(function: S => T) : T = if (value ==
null) {
+ null.asInstanceOf[T]
+ } else {
+ function(value)
+ }
+ private def nullOrElse[S,T](value: S, default: T)(function: S => T) : T = if
(value == null) {
+ default
+ } else {
+ function(value)
+ }
+
}
Modified:
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
URL:
http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala?rev=1149640&r1=1149639&r2=1149640&view=diff
==============================================================================
---
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
(original)
+++
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
Fri Jul 22 15:48:43 2011
@@ -36,7 +36,7 @@ class GlobalProcessorFactory extends Pro
def removeFactory(factory: DelegateProcessorFactory) =
triggerUpdate(factories -= factory);
def createChildProcessor(context: RouteContext, definition:
ProcessorDefinition[_], mandatory: Boolean) = {
- nullOrElse(definition.createProcessor(context))(new
GlobalDelegateProcessor(context, definition, _))
+ nullOrElse(context.createProcessor(definition))(new
GlobalDelegateProcessor(context, definition, _))
}
def createProcessor(context: RouteContext, definition:
ProcessorDefinition[_]) = {
Added: servicemix/smx5/trunk/core/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/test/resources/log4j.properties?rev=1149640&view=auto
==============================================================================
--- servicemix/smx5/trunk/core/src/test/resources/log4j.properties (added)
+++ servicemix/smx5/trunk/core/src/test/resources/log4j.properties Fri Jul 22
15:48:43 2011
@@ -0,0 +1,26 @@
+#
+# Copyright (C) 2011, FuseSource Corp. All rights reserved.
+# http://fusesource.com
+#
+# The software in this package is published under the terms of the
+# CDDL license a copy of which has been included with this distribution
+# in the license.txt file.
+#
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=DEBUG, console, file
+
+# Console will only display warnnings
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
+log4j.appender.console.threshold=WARN
+
+# File appender will contain all info messages
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
+log4j.appender.file.file=target/test.log
+log4j.appender.file.append=true
Modified:
servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala
URL:
http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala?rev=1149640&r1=1149639&r2=1149640&view=diff
==============================================================================
---
servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala
(original)
+++
servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala
Fri Jul 22 15:48:43 2011
@@ -26,9 +26,9 @@ import org.apache.camel.scala.dsl.builde
import scala.collection.JavaConversions.asScalaBuffer
import org.apache.camel.impl.{DefaultCamelContext, DefaultProducerTemplate}
-import org.apache.servicemix.core.Breadcrumbs.{hasBreadCrumb, getBreadCrumb}
-import org.scalatest.Assertions._
+import org.apache.servicemix.core.Breadcrumbs.{hasBreadCrumb, getBreadCrumb,
getBreadCrumbs}
import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite}
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy
@RunWith(classOf[JUnitRunner])
class BreadcrumbsTest extends FunSuite with RouteBuilderSupport with
BeforeAndAfterAll with BeforeAndAfterEach {
@@ -118,6 +118,23 @@ class BreadcrumbsTest extends FunSuite w
assert(!hasBreadCrumb(exchange), "There should be no more bread crumbs
here")
}
+ test("bread crumb strategy with aggregator") {
+ Breadcrumbs.enable(context)
+
+ for (body <- messages) {
+ template.sendBody("direct:aggregate", body)
+ }
+
+ val aggres = getMockEndpoint("mock:aggres")
+ aggres.expectedMessageCount(1)
+ aggres.assertIsSatisfied()
+
+ val exchange = aggres.getExchanges.get(0)
+ val bcs = getBreadCrumbs(exchange)
+ assert(bcs.size == messages.size, "There should be no more bread crumbs
here")
+ }
+
+
override protected def afterEach() = {
MockEndpoint.resetMocks(context)
context.getProcessorFactory.asInstanceOf[GlobalProcessorFactory].factories.clear
@@ -126,11 +143,18 @@ class BreadcrumbsTest extends FunSuite w
def getMockEndpoint(name: String) = context.getEndpoint(name,
classOf[MockEndpoint])
def createRouteBuilder() = new RouteBuilder {
+ "direct:aggregate" ==> {
+ aggregate (true, new
UseLatestAggregationStrategy()).completionSize(messages.size) {
+ to("mock:aggres")
+ }
+ }
+
"direct:test" ==> {
to("mock:hansel")
to("seda:forest")
}
"seda:forest" to "mock:gretel"
+
}
}
\ No newline at end of file