Author: ningjiang
Date: Sun Mar 24 03:31:41 2013
New Revision: 1460260
URL: http://svn.apache.org/r1460260
Log:
CAMEL-6182 merged some implict converters
Added:
camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/routeSpec.scala
Modified:
camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/Preamble.scala
camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/MulticastTest.scala
Modified:
camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/Preamble.scala
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/Preamble.scala?rev=1460260&r1=1460259&r2=1460260&view=diff
==============================================================================
---
camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/Preamble.scala
(original)
+++
camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/Preamble.scala
Sun Mar 24 03:31:41 2013
@@ -17,6 +17,9 @@
package org.apache.camel
package scala
+import processor.aggregate.AggregationStrategy
+
+object Preamble extends Preamble
/**
* Trait containing common implicit conversion definitions
*/
@@ -26,4 +29,99 @@ trait Preamble {
implicit def enrichInt(int: Int) = new RichInt(int)
implicit def int2Period(value: Int) = new SimplePeriod(value)
+ implicit def enrichMessage(msg: Message) = new RichMessage(msg)
+
+ implicit def enrichFnAny(f: Exchange => Any) = new ScalaPredicate(f)
+ implicit def enrichAggr(f: (Exchange, Exchange) => Exchange) = new
FnAggregationStrategy(f)
+
+ /**
+ * process { in(classOf[String]) { _+"11" } .toIn }
+ * process { in(classOf[Int]) { 11+ } .toOut }
+ *
+ * process(in(classOf[Event]) {
+ * case event: LoginEvent => doSession(event)
+ * case event: LogoutEvent => removeSession(event)
+ * })
+ */
+ def in[T](clazz: Class[T]) = new BodyExtractor[T](_.getIn.getBody(clazz))
+
+ /**
+ * process { out(classOf[String]) { (s: String) => s+"11" } .toIn }
+ * process { out(classOf[Int]) { _+11 } .toOut }
+ */
+ def out[T](clazz: Class[T]) = new BodyExtractor[T](_.getOut.getBody(clazz))
+
+ /**
+ * filter { in(classOf[Int]) { _ % 2 == 0 } }
+ * filter { out(classOf[String]) { (s: String) => s.startsWith("aa") } }
+ */
+ implicit def wrapperFilter(w: WrappedProcessor) = w.predicate
+
+ trait WrappedProcessor extends Processor {
+ implicit def enrichFnUnit(f: Exchange => Unit) = new ScalaProcessor(f)
+
+ def run(exchange: Exchange): Option[Any]
+
+ def toIn: Processor =
+ (exchange: Exchange) =>
+ run(exchange) foreach {
+ case () => throw new RuntimeTransformException("Cannot save Unit
result into message")
+ case v => exchange.in = v
+ }
+
+ def toOut: Processor =
+ (exchange: Exchange) =>
+ run(exchange) foreach {
+ case () => throw new RuntimeTransformException("Cannot save Unit
result into message")
+ case v => exchange.out = v
+ }
+
+ def predicate: Predicate =
+ (exchange: Exchange) =>
+ run(exchange) map {
+ case () => throw new RuntimeTransformException("Unit result cannot
be used in Predicate")
+ case v => v
+ } getOrElse false
+
+ override def process(exchange: Exchange) {
+ run(exchange) foreach {
+ case () =>
+ case v => exchange.in = v
+ }
+ }
+ }
+
+ class BodyExtractor[T](val get: (Exchange) => T) {
+ def by(f: (T) => Any): WrappedProcessor = new FnProcessor(f)
+
+ /**
+ * process { in(classOf[Event]) collect { case event: LoginEvent =>
doSession(event) } }
+ * filter { in(classOf[Event]) collect { case event: LoginEvent =>
event.isAdmin } }
+ */
+ def collect(pf: PartialFunction[T,Any]): WrappedProcessor = new
PfProcessor(pf)
+
+ def apply(f: (T) => Any): WrappedProcessor = by(f)
+
+ /**
+ * Wrapper for function processor / predicate
+ */
+ class FnProcessor(val f: (T) => Any) extends WrappedProcessor {
+ override def run(exchange: Exchange): Option[Any] =
Some(f(get(exchange)))
+ }
+
+ /**
+ * Wrapper for PartialFunction processor / predicate
+ */
+ class PfProcessor(val pf: PartialFunction[T,Any]) extends WrappedProcessor
{
+ override def run(exchange: Exchange): Option[Any] =
PartialFunction.condOpt(get(exchange))(pf)
+ }
+ }
+
+ /**
+ * Wrapper for (Exchange, Exchange) => Exchange that acts as
AggregationStrategy
+ */
+ class FnAggregationStrategy(aggregator: (Exchange, Exchange) => Exchange)
extends AggregationStrategy {
+ override def aggregate(original: Exchange, resource: Exchange): Exchange =
aggregator(original, resource)
+ }
+
}
Modified:
camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/MulticastTest.scala
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/MulticastTest.scala?rev=1460260&r1=1460259&r2=1460260&view=diff
==============================================================================
---
camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/MulticastTest.scala
(original)
+++
camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/MulticastTest.scala
Sun Mar 24 03:31:41 2013
@@ -59,7 +59,7 @@ class SMulticastParallelTest extends Mul
if (oldExchange == null) {
newExchange
} else {
- oldExchange.in= oldExchange.in[String] + newExchange.in[String]
+ oldExchange.in = oldExchange.in[String] + newExchange.in[String]
oldExchange
}
}
Added:
camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/routeSpec.scala
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/routeSpec.scala?rev=1460260&view=auto
==============================================================================
---
camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/routeSpec.scala
(added)
+++
camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/routeSpec.scala
Sun Mar 24 03:31:41 2013
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.scala
+
+import org.scalatest.FunSpec
+import org.scalatest.matchers.MustMatchers
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+import
org.apache.camel.{Exchange,Processor,Predicate,RuntimeTransformException}
+
+
+@RunWith(classOf[JUnitRunner])
+class RouteSpec extends FunSpec with CamelSpec with MustMatchers with Preamble
{
+ describe("Processor/DSL") {
+ it("should process in") {
+ def f(x: Int): Int = x+1
+ val e = processExchange(in(classOf[Int]) {f _}) { _.in = 1 }
+ e.in[Int] must equal(2)
+ }
+ it("should process in -> in") {
+ val e = processExchange(in(classOf[Int]) {1+} .toIn) { _.in = 1 }
+ e.in must equal(2)
+ }
+ it("should process in -> out") {
+ val e = processExchange(in(classOf[Int]) {1+} .toOut) { _.in = 1 }
+ e.out must equal(2)
+ }
+ it("should process out ->") {
+ val e = processExchange(out(classOf[Int]) {1+}) { _.out = 1 }
+ e.in must equal(2)
+ }
+ it("should process out -> in") {
+ val e = processExchange(out(classOf[Int]) {1+} .toIn) { _.out = 1 }
+ e.in must equal(2)
+ }
+ it("should process out -> out") {
+ val e = processExchange(out(classOf[Int]) {1+} .toOut) { _.out = 1 }
+ e.out must equal(2)
+ }
+ it("should not modify exchange when function returns Unit") {
+ def fn(i: Int) { }
+ val e = processExchange(in(classOf[Int]) {fn _}) { _.in = 1}
+ e.in must equal(1)
+ }
+ it("should raise exception when trying to set In when function returns
Unit") {
+ def fn(i: Int) { }
+ evaluating { processExchange(in(classOf[Int]) {fn _} .toIn) { _.in = 1}
} must produce [RuntimeTransformException]
+ }
+ it("should raise exception when trying to set Out when function returns
Unit") {
+ def fn(i: Int) { }
+ evaluating { processExchange(in(classOf[Int]) {fn _} .toOut) { _.in = 1}
} must produce [RuntimeTransformException]
+ }
+ }
+ describe("Predicate/DSL") {
+ it("should filter in") {
+ filterExchange(in(classOf[Int]) {1==}) { _.in = 1 } must equal(true)
+ }
+ it("should raise exception when trying to filter when function returns
Unit") {
+ def fn(i: Int) { }
+ evaluating { filterExchange(in(classOf[Int]) {fn _}) { _.in = 1} } must
produce [RuntimeTransformException]
+ }
+ }
+ describe("PartialFunction/DSL") {
+ sealed trait AlgoType
+ case object LeafOne extends AlgoType
+ case object LeafTwo extends AlgoType
+
+ it("should leave message body if it's not in function domain") {
+ val p: Processor = in(classOf[AlgoType]) collect {
+ case LeafOne => LeafTwo
+ }
+
+ val e = processExchange(p) { _.in = LeafTwo }
+ e.in[AlgoType] must equal(LeafTwo)
+ }
+ it("should process body if it's in function domain") {
+ val p: Processor = in(classOf[AlgoType]) collect {
+ case LeafOne => LeafTwo
+ }
+
+ val e = processExchange(p) { _.in = LeafOne }
+ e.in[AlgoType] must equal(LeafTwo)
+ }
+ it("should filter") {
+ val p: Predicate = in(classOf[AlgoType]) collect {
+ case LeafOne => true
+ }
+
+ filterExchange(p) { _.in = LeafOne } must equal(true)
+ filterExchange(p) { _.in = LeafTwo } must equal(false)
+ }
+ }
+
+ def processExchange(p: Processor)(pre: Exchange => Unit) = {
+ val e = createExchange
+ pre(e)
+ p.process(e)
+ e
+ }
+
+ def filterExchange(f: Predicate)(pre: Exchange => Unit) = {
+ val e = createExchange
+ pre(e)
+ f.matches(e)
+ }
+}