Author: ningjiang
Date: Sun Mar 24 03:31:41 2013
New Revision: 1460260

URL: http://svn.apache.org/r1460260
Log:
CAMEL-6182 merged some implict converters

Added:
    
camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/routeSpec.scala
Modified:
    
camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/Preamble.scala
    
camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/MulticastTest.scala

Modified: 
camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/Preamble.scala
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/Preamble.scala?rev=1460260&r1=1460259&r2=1460260&view=diff
==============================================================================
--- 
camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/Preamble.scala
 (original)
+++ 
camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/Preamble.scala
 Sun Mar 24 03:31:41 2013
@@ -17,6 +17,9 @@
 package org.apache.camel
 package scala
 
+import processor.aggregate.AggregationStrategy
+
+object Preamble extends Preamble
 /**
  * Trait containing common implicit conversion definitions
  */
@@ -26,4 +29,99 @@ trait Preamble {
   implicit def enrichInt(int: Int) = new RichInt(int)
   implicit def int2Period(value: Int) = new SimplePeriod(value)
 
+  implicit def enrichMessage(msg: Message) = new RichMessage(msg)
+
+  implicit def enrichFnAny(f: Exchange => Any) = new ScalaPredicate(f)
+  implicit def enrichAggr(f: (Exchange, Exchange) => Exchange) = new 
FnAggregationStrategy(f)
+
+  /**
+   * process { in(classOf[String]) { _+"11" } .toIn }
+   * process { in(classOf[Int]) { 11+ } .toOut }
+   *
+   * process(in(classOf[Event]) {
+   *   case event: LoginEvent => doSession(event)
+   *   case event: LogoutEvent => removeSession(event)
+   * })
+   */
+  def in[T](clazz: Class[T]) = new BodyExtractor[T](_.getIn.getBody(clazz))
+
+  /**
+   * process { out(classOf[String]) { (s: String) => s+"11" } .toIn }
+   * process { out(classOf[Int]) { _+11 } .toOut }
+   */
+  def out[T](clazz: Class[T]) = new BodyExtractor[T](_.getOut.getBody(clazz))
+
+  /**
+   * filter { in(classOf[Int]) { _ % 2 == 0 } }
+   * filter { out(classOf[String]) { (s: String) => s.startsWith("aa") } }
+   */
+  implicit def wrapperFilter(w: WrappedProcessor) = w.predicate
+
+  trait WrappedProcessor extends Processor {
+    implicit def enrichFnUnit(f: Exchange => Unit) = new ScalaProcessor(f)
+
+    def run(exchange: Exchange): Option[Any]
+
+    def toIn: Processor =
+      (exchange: Exchange) =>
+        run(exchange) foreach {
+          case () => throw new RuntimeTransformException("Cannot save Unit 
result into message")
+          case v => exchange.in = v
+        }
+
+    def toOut: Processor =
+      (exchange: Exchange) =>
+        run(exchange) foreach {
+          case () => throw new RuntimeTransformException("Cannot save Unit 
result into message")
+          case v => exchange.out = v
+        }
+
+    def predicate: Predicate =
+      (exchange: Exchange) =>
+        run(exchange) map {
+          case () => throw new RuntimeTransformException("Unit result cannot 
be used in Predicate")
+          case v => v
+        } getOrElse false
+
+    override def process(exchange: Exchange) {
+      run(exchange) foreach {
+        case () =>
+        case v => exchange.in = v
+      }
+    }
+  }
+
+  class BodyExtractor[T](val get: (Exchange) => T) {
+    def by(f: (T) => Any): WrappedProcessor = new FnProcessor(f)
+
+    /**
+     * process { in(classOf[Event]) collect { case event: LoginEvent => 
doSession(event) } }
+     * filter { in(classOf[Event]) collect { case event: LoginEvent => 
event.isAdmin } }
+     */
+    def collect(pf: PartialFunction[T,Any]): WrappedProcessor = new 
PfProcessor(pf)
+
+    def apply(f: (T) => Any): WrappedProcessor = by(f)
+
+    /**
+     * Wrapper for function processor / predicate
+     */
+    class FnProcessor(val f: (T) => Any) extends WrappedProcessor {
+      override def run(exchange: Exchange): Option[Any] = 
Some(f(get(exchange)))
+    }
+
+    /**
+     * Wrapper for PartialFunction processor / predicate
+     */
+    class PfProcessor(val pf: PartialFunction[T,Any]) extends WrappedProcessor 
{
+      override def run(exchange: Exchange): Option[Any] = 
PartialFunction.condOpt(get(exchange))(pf)
+    }
+  }
+
+  /**
+   * Wrapper for (Exchange, Exchange) => Exchange that acts as 
AggregationStrategy
+   */
+  class FnAggregationStrategy(aggregator: (Exchange, Exchange) => Exchange) 
extends AggregationStrategy {
+    override def aggregate(original: Exchange, resource: Exchange): Exchange = 
aggregator(original, resource)
+  }
+
 }

Modified: 
camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/MulticastTest.scala
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/MulticastTest.scala?rev=1460260&r1=1460259&r2=1460260&view=diff
==============================================================================
--- 
camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/MulticastTest.scala
 (original)
+++ 
camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/MulticastTest.scala
 Sun Mar 24 03:31:41 2013
@@ -59,7 +59,7 @@ class SMulticastParallelTest extends Mul
       if (oldExchange == null) {
         newExchange
       } else {
-        oldExchange.in= oldExchange.in[String] + newExchange.in[String]
+        oldExchange.in = oldExchange.in[String] + newExchange.in[String]
         oldExchange
       }
     }

Added: 
camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/routeSpec.scala
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/routeSpec.scala?rev=1460260&view=auto
==============================================================================
--- 
camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/routeSpec.scala
 (added)
+++ 
camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/routeSpec.scala
 Sun Mar 24 03:31:41 2013
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.scala
+
+import org.scalatest.FunSpec
+import org.scalatest.matchers.MustMatchers
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+import 
org.apache.camel.{Exchange,Processor,Predicate,RuntimeTransformException}
+
+
+@RunWith(classOf[JUnitRunner])
+class RouteSpec extends FunSpec with CamelSpec with MustMatchers with Preamble 
{
+  describe("Processor/DSL") {
+    it("should process in") {
+      def f(x: Int): Int = x+1
+      val e = processExchange(in(classOf[Int]) {f _}) { _.in = 1 }
+      e.in[Int] must equal(2)
+    }
+    it("should process in -> in") {
+      val e = processExchange(in(classOf[Int]) {1+} .toIn) { _.in = 1 }
+      e.in must equal(2)
+    }
+    it("should process in -> out") {
+      val e = processExchange(in(classOf[Int]) {1+} .toOut) { _.in = 1 }
+      e.out must equal(2)
+    }
+    it("should process out ->") {
+      val e = processExchange(out(classOf[Int]) {1+}) { _.out = 1 }
+      e.in must equal(2)
+    }
+    it("should process out -> in") {
+      val e = processExchange(out(classOf[Int]) {1+} .toIn) { _.out = 1 }
+      e.in must equal(2)
+    }
+    it("should process out -> out") {
+      val e = processExchange(out(classOf[Int]) {1+} .toOut) { _.out = 1 }
+      e.out must equal(2)
+    }
+    it("should not modify exchange when function returns Unit") {
+      def fn(i: Int) { }
+      val e = processExchange(in(classOf[Int]) {fn _}) { _.in = 1}
+      e.in must equal(1)
+    }
+    it("should raise exception when trying to set In when function returns 
Unit") {
+      def fn(i: Int) { }
+      evaluating { processExchange(in(classOf[Int]) {fn _} .toIn) { _.in = 1} 
} must produce [RuntimeTransformException]
+    }
+    it("should raise exception when trying to set Out when function returns 
Unit") {
+      def fn(i: Int) { }
+      evaluating { processExchange(in(classOf[Int]) {fn _} .toOut) { _.in = 1} 
} must produce [RuntimeTransformException]
+    }
+  }
+  describe("Predicate/DSL") {
+    it("should filter in") {
+      filterExchange(in(classOf[Int]) {1==}) { _.in = 1 } must equal(true)
+    }
+    it("should raise exception when trying to filter when function returns 
Unit") {
+      def fn(i: Int) { }
+      evaluating { filterExchange(in(classOf[Int]) {fn _}) { _.in = 1} } must 
produce [RuntimeTransformException]
+    }
+  }
+  describe("PartialFunction/DSL") {
+    sealed trait AlgoType
+    case object LeafOne extends AlgoType
+    case object LeafTwo extends AlgoType
+
+    it("should leave message body if it's not in function domain") {
+      val p: Processor = in(classOf[AlgoType]) collect {
+        case LeafOne => LeafTwo
+      }
+
+      val e = processExchange(p) { _.in = LeafTwo }
+      e.in[AlgoType] must equal(LeafTwo)
+    }
+    it("should process body if it's in function domain") {
+      val p: Processor = in(classOf[AlgoType]) collect {
+        case LeafOne => LeafTwo
+      }
+
+      val e = processExchange(p) { _.in = LeafOne }
+      e.in[AlgoType] must equal(LeafTwo)
+    }
+    it("should filter") {
+      val p: Predicate = in(classOf[AlgoType]) collect {
+        case LeafOne => true
+      }
+
+      filterExchange(p) { _.in = LeafOne } must equal(true)
+      filterExchange(p) { _.in = LeafTwo } must equal(false)
+    }
+  }
+
+  def processExchange(p: Processor)(pre: Exchange => Unit) = {
+    val e = createExchange
+    pre(e)
+    p.process(e)
+    e
+  }
+
+  def filterExchange(f: Predicate)(pre: Exchange => Unit) = {
+    val e = createExchange
+    pre(e)
+    f.matches(e)
+  }
+}


Reply via email to