Repository: incubator-gearpump
Updated Branches:
  refs/heads/akka-streams 5c4d60c5b -> 4fe5458f4


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
new file mode 100644
index 0000000..8e7a2df
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.akkastream.task.GraphTask.{Index, PortId}
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskWrapper}
+
+class GraphTask(inputTaskContext : TaskContext, userConf : UserConfig)
+  extends Task(inputTaskContext, userConf) {
+
+  private val context = inputTaskContext.asInstanceOf[TaskWrapper]
+  protected val outMapping =
+    
portsMapping(userConf.getValue[List[ProcessorId]](GraphTask.OUT_PROCESSORS).get)
+  protected val inMapping =
+    
portsMapping(userConf.getValue[List[ProcessorId]](GraphTask.IN_PROCESSORS).get)
+
+  val sizeOfOutPorts = outMapping.keys.size
+  val sizeOfInPorts = inMapping.keys.size
+  
+  private def portsMapping(processors: List[ProcessorId]): Map[PortId, Index] 
= {
+    val portToProcessor = processors.zipWithIndex.map{kv =>
+      (kv._2, kv._1)
+    }.toMap
+
+    val processorToIndex = processors.sorted.zipWithIndex.toMap
+
+    val portToIndex = portToProcessor.map{kv =>
+      val (outlet, processorId) = kv
+      val index = processorToIndex(processorId)
+      (outlet, index)
+    }
+    portToIndex
+  }
+
+  def output(outletId: Int, msg: Message): Unit = {
+    context.output(outMapping(outletId), msg)
+  }
+
+  override def onStart(startTime : Instant) : Unit = {}
+
+  override def onStop() : Unit = {}
+}
+
+object GraphTask {
+  val OUT_PROCESSORS = "org.apache.gearpump.akkastream.task.outprocessors"
+  val IN_PROCESSORS = "org.apache.gearpump.akkastream.task.inprocessors"
+
+  type PortId = Int
+  type Index = Int
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
new file mode 100644
index 0000000..29d9c91
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.collection.immutable.VectorBuilder
+import scala.concurrent.duration.FiniteDuration
+
+class GroupedWithinTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  case object GroupedWithinTrigger
+  val buf: VectorBuilder[T] = new VectorBuilder
+  val timeWindow = 
userConf.getValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW)
+  val batchSize = userConf.getInt(GroupedWithinTask.BATCH_SIZE)
+
+  override def onNext(msg : Message) : Unit = {
+
+  }
+}
+
+object GroupedWithinTask {
+  val BATCH_SIZE = "BATCH_SIZE"
+  val TIME_WINDOW = "TIME_WINDOW"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
new file mode 100644
index 0000000..837de6b
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class InterleaveTask(context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val sizeOfInputs = sizeOfInPorts
+  var index = 0
+
+  // TODO access upstream and pull
+  override def onNext(msg : Message) : Unit = {
+    output(index, msg)
+    index += 1
+    if (index == sizeOfInputs) {
+      index = 0
+    }
+  }
+}
+
+object InterleaveTask {
+  val INPUT_PORTS = "INPUT_PORTS"
+  val SEGMENT_SIZE = "SEGMENT_SIZE"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
new file mode 100644
index 0000000..387116d
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.Future
+
+class MapAsyncTask[In, Out](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val f = userConf.getValue[In => Future[Out]](MapAsyncTask.MAPASYNC_FUNC)
+  implicit val ec = context.system.dispatcher
+
+  override def onNext(msg : Message) : Unit = {
+    val data = msg.msg.asInstanceOf[In]
+    val time = msg.timestamp
+    f match {
+      case Some(func) =>
+        val fout = func(data)
+        fout.onComplete(value => {
+          value.foreach(out => {
+            val msg = new Message(out, time)
+            context.output(msg)
+          })
+        })
+      case None =>
+    }
+  }
+}
+
+object MapAsyncTask {
+  val MAPASYNC_FUNC = "MAPASYNC_FUNC"
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
new file mode 100644
index 0000000..2b1cd33
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * SeG the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class MergeTask(context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val eagerComplete = userConf.getBoolean(MergeTask.EAGER_COMPLETE)
+  val inputPorts = userConf.getInt(MergeTask.INPUT_PORTS)
+
+  override def onNext(msg : Message) : Unit = {
+    context.output(msg)
+  }
+}
+
+object MergeTask {
+  val EAGER_COMPLETE = "EAGER_COMPLETE"
+  val INPUT_PORTS = "INPUT_PORTS"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
new file mode 100644
index 0000000..1ff9ccd
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.duration.FiniteDuration
+
+class SingleSourceTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val elem = userConf.getValue[T](SingleSourceTask.ELEMENT).get
+
+  override def onNext(msg : Message) : Unit = {
+    context.output(Message(elem, msg.timestamp))
+  }
+}
+
+object SingleSourceTask {
+  val ELEMENT = "ELEMENT"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
new file mode 100644
index 0000000..05011e9
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util
+import java.util.concurrent.TimeUnit
+
+import akka.actor.Actor.Receive
+import akka.actor.{Actor, ActorRef, ActorSystem, Props}
+import akka.util.Timeout
+import org.apache.gearpump.Message
+import org.apache.gearpump.akkastream.task.SinkBridgeTask.RequestMessage
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, 
TaskActorRef}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId}
+import org.apache.gearpump.util.LogUtil
+import org.reactivestreams.{Publisher, Subscriber, Subscription}
+
+/**
+ * Bridge Task when data flow is from remote Gearpump Task to local 
Akka-Stream Module
+ *
+ *
+ * upstream [[Task]] -> [[SinkBridgeTask]]
+ *                         \              Remote Cluster
+ * -------------------------\----------------------
+ *                           \            Local JVM
+ *                            \|
+ *                       Akka Stream [[Subscriber]]
+ *
+ *
+ * @param taskContext TaskContext
+ * @param userConf UserConfig
+ */
+class SinkBridgeTask(taskContext : TaskContext, userConf : UserConfig)
+  extends Task(taskContext, userConf) {
+  import taskContext.taskId
+
+  val queue = new util.LinkedList[Message]()
+  var subscriber: ActorRef = _
+
+  var request: Int = 0
+
+  override def onStart(startTime : Instant) : Unit = {}
+
+  override def onNext(msg : Message) : Unit = {
+    queue.add(msg)
+    trySendingData()
+  }
+
+  override def onStop() : Unit = {}
+
+  private def trySendingData(): Unit = {
+    if (subscriber != null) {
+      (0 to request).map(_ => queue.poll()).filter(_ != null).foreach { msg =>
+        subscriber ! msg.msg
+        request -= 1
+      }
+    }
+  }
+
+  override def receiveUnManagedMessage: Receive = {
+    case RequestMessage(n) =>
+      this.subscriber = sender
+      LOG.info("the downstream has requested " + n + " messages from " + 
subscriber)
+      request += n.toInt
+      trySendingData()
+    case msg =>
+      LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", 
" + msg.toString)
+  }
+}
+
+object SinkBridgeTask {
+
+  case class RequestMessage(number: Int)
+
+  class SinkBridgeTaskClient(system: ActorSystem, context: ClientContext, 
appId: Int,
+      processorId: ProcessorId) extends Publisher[AnyRef] with Subscription {
+    private val taskId = TaskId(processorId, index = 0)
+    private val LOG = LogUtil.getLogger(getClass)
+
+    private var actor: ActorRef = _
+    import system.dispatcher
+
+    private val task =
+      context.askAppMaster[TaskActorRef](appId, 
LookupTaskActorRef(taskId)).map{container =>
+      // println("Successfully resolved taskRef for taskId " + taskId + ", " + 
container.task)
+      container.task
+    }
+
+    override def subscribe(subscriber: Subscriber[_ >: AnyRef]): Unit = {
+      this.actor = system.actorOf(Props(new ClientActor(subscriber)))
+      subscriber.onSubscribe(this)
+    }
+
+    override def cancel(): Unit = Unit
+
+    private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
+
+    override def request(l: Long): Unit = {
+      task.foreach{ task =>
+        task.tell(RequestMessage(l.toInt), actor)
+      }
+    }
+  }
+
+  class ClientActor(subscriber: Subscriber[_ >: AnyRef]) extends Actor {
+    def receive: Receive = {
+      case result: AnyRef =>
+        subscriber.onNext(result)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
new file mode 100644
index 0000000..b0eda19
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+
+import akka.actor.Actor.Receive
+import org.apache.gearpump.Message
+import 
org.apache.gearpump.akkastream.task.SourceBridgeTask.{AkkaStreamMessage, 
Complete, Error}
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, 
TaskActorRef}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId}
+import org.reactivestreams.{Subscriber, Subscription}
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * Bridge Task when data flow is from local Akka-Stream Module to remote 
Gearpump Task
+ *
+ *
+ *
+ *      [[SourceBridgeTask]]   --> downstream [[Task]]
+ *                 /|                Remote Cluster
+ * ---------------/--------------------------------
+ *               /                    Local JVM
+ *    Akka Stream [[org.reactivestreams.Publisher]]
+ *
+ *
+ * @param taskContext TaskContext
+ * @param userConf UserConfig
+ */
+class SourceBridgeTask(taskContext : TaskContext, userConf : UserConfig)
+  extends Task(taskContext, userConf) {
+  import taskContext.taskId
+
+  override def onStart(startTime : Instant) : Unit = {}
+
+  override def onNext(msg : Message) : Unit = {
+    LOG.info("AkkaStreamSource receiving message " + msg)
+  }
+
+  override def onStop() : Unit = {}
+
+  override def receiveUnManagedMessage: Receive = {
+    case Error(ex) =>
+      LOG.error("the stream has error", ex)
+    case AkkaStreamMessage(msg) =>
+      LOG.info("we have received message from akka stream source: " + msg)
+      taskContext.output(Message(msg, System.currentTimeMillis()))
+    case Complete(description) =>
+      LOG.info("the stream is completed: " + description)
+    case msg =>
+      LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", 
" + msg.toString)
+  }
+}
+
+
+object SourceBridgeTask {
+  case class Error(ex: java.lang.Throwable)
+
+  case class Complete(description: String)
+
+  case class AkkaStreamMessage[T >: AnyRef](msg: T)
+
+  class SourceBridgeTaskClient[T >: AnyRef](ec: ExecutionContext,
+      context: ClientContext, appId: Int, processorId: ProcessorId) extends 
Subscriber[T] {
+    val taskId = TaskId(processorId, 0)
+    var subscription: Subscription = _
+    implicit val dispatcher = ec
+
+    val task = context.askAppMaster[TaskActorRef](appId,
+      LookupTaskActorRef(taskId)).map{container =>
+      // println("Successfully resolved taskRef for taskId " + taskId + ", " + 
container.task)
+      container.task
+    }
+
+    override def onError(throwable: Throwable): Unit = {
+      task.map(task => task ! Error(throwable))
+    }
+
+    override def onSubscribe(subscription: Subscription): Unit = {
+      // when taskActorRef is resolved, request message from upstream
+      this.subscription = subscription
+      task.map(task => subscription.request(1))
+    }
+
+    override def onComplete(): Unit = {
+      task.map(task => task ! Complete("the upstream is completed"))
+    }
+
+    override def onNext(t: T): Unit = {
+      task.map {task =>
+        task ! AkkaStreamMessage(t)
+      }
+      subscription.request(1)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
new file mode 100644
index 0000000..bf2c14f
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class StatefulMapConcatTask[IN, OUT](context: TaskContext, userConf : 
UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val func = userConf.getValue[() => IN => 
Iterable[OUT]](StatefulMapConcatTask.FUNC).get
+  var f: IN => Iterable[OUT] = _
+
+  override def onStart(startTime: Instant) : Unit = {
+    f = func()
+  }
+
+  override def onNext(msg : Message) : Unit = {
+    val in: IN = msg.msg.asInstanceOf[IN]
+    val out: Iterable[OUT] = f(in)
+    val iterator = out.iterator
+    while(iterator.hasNext) {
+      val nextValue = iterator.next
+      context.output(Message(nextValue, System.currentTimeMillis()))
+    }
+  }
+}
+
+object StatefulMapConcatTask {
+  val FUNC = "FUNC"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
new file mode 100644
index 0000000..ef43fbe
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.duration.FiniteDuration
+
+case object TakeWithinTimeout
+
+class TakeWithinTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val timeout = userConf.getValue[FiniteDuration](TakeWithinTask.TIMEOUT).
+    getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+  var timeoutActive = false
+
+  override def onStart(startTime: Instant): Unit = {
+    context.scheduleOnce(timeout)(
+      self ! Message(DropWithinTimeout, System.currentTimeMillis())
+    )
+  }
+
+  override def onNext(msg : Message) : Unit = {
+    msg.msg match {
+      case DropWithinTimeout =>
+        timeoutActive = true
+      case _ =>
+
+    }
+    timeoutActive match {
+      case true =>
+      case false =>
+        context.output(msg)
+    }
+  }
+}
+
+object TakeWithinTask {
+  val TIMEOUT = "TIMEOUT"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
new file mode 100644
index 0000000..4e09bf2
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+
+class ThrottleTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val cost = userConf.getInt(ThrottleTask.COST).getOrElse(0)
+  val costCalc = userConf.getValue[T => Int](ThrottleTask.COST_CALC)
+  val maxBurst = userConf.getInt(ThrottleTask.MAX_BURST)
+  val timePeriod = userConf.getValue[FiniteDuration](ThrottleTask.TIME_PERIOD).
+    getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+  val interval = timePeriod.toNanos / cost
+
+  // TODO control rate from TaskActor
+  override def onNext(msg : Message) : Unit = {
+    val data = msg.msg.asInstanceOf[T]
+    val time = msg.timestamp
+    context.output(msg)
+  }
+}
+
+object ThrottleTask {
+  val COST = "COST"
+  val COST_CALC = "COST_CAL"
+  val MAX_BURST = "MAX_BURST"
+  val TIME_PERIOD = "TIME_PERIOD"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
new file mode 100644
index 0000000..b3850ca
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.duration.FiniteDuration
+
+class TickSourceTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val initialDelay = 
userConf.getValue[FiniteDuration](TickSourceTask.INITIAL_DELAY).
+    getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+  (TickSourceTask.INITIAL_DELAY)
+  val interval = userConf.getValue[FiniteDuration](TickSourceTask.INTERVAL).
+    getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+  val tick = userConf.getValue[T](TickSourceTask.TICK).get
+
+  override def onStart(startTime: Instant): Unit = {
+    context.schedule(initialDelay, interval)(
+      self ! Message(tick, System.currentTimeMillis())
+    )
+  }
+
+  override def onNext(msg : Message) : Unit = {
+    context.output(msg)
+  }
+}
+
+object TickSourceTask {
+  val INITIAL_DELAY = "INITIAL_DELAY"
+  val INTERVAL = "INTERVAL"
+  val TICK = "TICK"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
new file mode 100644
index 0000000..99f1b55
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.akkastream.task.Unzip2Task.UnZipFunction
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class Unzip2Task[In, A1, A2](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val unzip = userConf.
+    getValue[UnZipFunction[In, A1, 
A2]](Unzip2Task.UNZIP2_FUNCTION)(context.system).get.unzip
+
+  override def onNext(msg : Message) : Unit = {
+    val message = msg.msg
+    val time = msg.timestamp
+    val pair = unzip(message.asInstanceOf[In])
+    val (a, b) = pair
+    output(0, Message(a.asInstanceOf[AnyRef], time))
+    output(1, Message(b.asInstanceOf[AnyRef], time))
+  }
+}
+
+object Unzip2Task {
+  case class UnZipFunction[In, A1, A2](val unzip: In => (A1, A2)) extends 
Serializable
+
+  val UNZIP2_FUNCTION = "org.apache.gearpump.akkastream.task.unzip2.function"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
new file mode 100644
index 0000000..a35b133
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.akkastream.task.Zip2Task.ZipFunction
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class Zip2Task[A1, A2, OUT](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val zip = userConf.
+    getValue[ZipFunction[A1, A2, 
OUT]](Zip2Task.ZIP2_FUNCTION)(context.system).get.zip
+  var a1: Option[A1] = None
+  var a2: Option[A2] = None
+
+  override def onNext(msg : Message) : Unit = {
+    val message = msg.msg
+    val time = msg.timestamp
+    a1 match {
+      case Some(x) =>
+        a2 = Some(message.asInstanceOf[A2])
+        a1.foreach(v1 => {
+          a2.foreach(v2 => {
+            val out = zip(v1, v2)
+            context.output(Message(out.asInstanceOf[OUT], time))
+
+          })
+        })
+      case None =>
+        a1 = Some(message.asInstanceOf[A1])
+    }
+  }
+}
+
+object Zip2Task {
+  case class ZipFunction[A1, A2, OUT](val zip: (A1, A2) => OUT) extends 
Serializable
+
+  val ZIP2_FUNCTION = "org.apache.gearpump.akkastream.task.zip2.function"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
new file mode 100644
index 0000000..c9fe67d
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.util
+
+import akka.stream.impl.StreamLayout.{Atomic, Combine, Ignore, 
MaterializedValueNode, Module, Transform}
+
+class MaterializedValueOps(mat: MaterializedValueNode) {
+  def resolve[Mat](materializedValues: scala.collection.mutable.Map[Module, 
Any]): Mat = {
+    def resolveMaterialized(mat: MaterializedValueNode,
+        materializedValues: scala.collection.mutable.Map[Module, Any]): Any = 
mat match {
+      case Atomic(m) => materializedValues.getOrElse(m, ())
+      case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues),
+        resolveMaterialized(d2, materializedValues))
+      case Transform(f, d) => f(resolveMaterialized(d, materializedValues))
+      case Ignore => ()
+    }
+    resolveMaterialized(mat, materializedValues).asInstanceOf[Mat]
+  }
+}
+
+object MaterializedValueOps{
+  def apply(mat: MaterializedValueNode): MaterializedValueOps = new 
MaterializedValueOps(mat)
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala
 
b/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala
deleted file mode 100644
index 4ead839..0000000
--- 
a/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump
-
-import akka.stream.Attributes
-import org.scalatest.{FlatSpec, Matchers}
-
-class AttributesSpec extends FlatSpec with Matchers {
-  it should "merge the attributes together" in {
-    val a = Attributes.name("aa")
-    val b = Attributes.name("bb")
-
-    val c = a and b
-
-    assert("aa-bb" == c.nameOrDefault())
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala
 
b/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala
new file mode 100644
index 0000000..3731d41
--- /dev/null
+++ 
b/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream
+
+import akka.stream.Attributes
+import org.scalatest.{FlatSpec, Matchers}
+
+class AttributesSpec extends FlatSpec with Matchers {
+  it should "merge the attributes together" in {
+    val a = Attributes.name("aa")
+    val b = Attributes.name("bb")
+
+    val c = a and b
+
+    assert("aa-bb" == c.nameOrDefault())
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index 17e78df..0b1628e 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -35,7 +35,8 @@ object Build extends sbt.Build {
 
   val copySharedSourceFiles = TaskKey[Unit]("copied shared services source 
code")
 
-  val akkaVersion = "2.4.3"
+  val akkaVersion = "2.4.10"
+  val akkaStreamVersion = "2.4-SNAPSHOT"
   val apacheRepo = "https://repository.apache.org/";
   val hadoopVersion = "2.6.0"
   val hbaseVersion = "1.0.0"
@@ -145,9 +146,12 @@ object Build extends sbt.Build {
       "com.typesafe.akka" %% "akka-cluster" % akkaVersion,
       "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion,
       "commons-logging" % "commons-logging" % commonsLoggingVersion,
-      "com.typesafe.akka" %% "akka-distributed-data-experimental" % 
akkaVersion,
+      "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion
+        exclude("com.typesafe.akka", "akka-stream_2.11"),
+      "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion,
       "org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided"
-    )
+    ),
+    dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % 
akkaStreamVersion
   )
 
   val streamingDependencies = Seq(
@@ -186,7 +190,8 @@ object Build extends sbt.Build {
       "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion,
       "org.scala-lang" % "scala-reflect" % scalaVersionNumber,
       "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4",
-      "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
+      "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test"
+        exclude("com.typesafe.akka", "akka-stream_2.11"),
       "org.scalatest" %% "scalatest" % scalaTestVersion % "test",
       "org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test",
       "org.mockito" % "mockito-core" % mockitoVersion % "test",
@@ -250,7 +255,7 @@ object Build extends sbt.Build {
     base = file("."),
     settings = commonSettings ++ noPublish ++ gearpumpUnidocSetting)
       .aggregate(shaded, core, daemon, streaming, services, external_kafka, 
external_monoid,
-      external_serializer, examples, storm, yarn, external_hbase, packProject,
+      external_serializer, examples, akkastream, storm, yarn, external_hbase, 
packProject,
       external_hadoopfs, integration_test).settings(Defaults.itSettings: _*)
       .disablePlugins(sbtassembly.AssemblyPlugin)
 
@@ -314,14 +319,17 @@ object Build extends sbt.Build {
 
   lazy val serviceJvmSettings = commonSettings ++ noPublish ++ Seq(
     libraryDependencies ++= Seq(
-      "com.typesafe.akka" %% "akka-http-testkit" % akkaVersion % "test",
+      "com.typesafe.akka" %% "akka-http-testkit" % akkaVersion % "test"
+        exclude("com.typesafe.akka", "akka-stream_2.11"),
       "org.scalatest" %% "scalatest" % scalaTestVersion % "test",
       "com.lihaoyi" %% "upickle" % upickleVersion,
       "com.softwaremill.akka-http-session" %% "core" % "0.2.5",
-      "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion,
+      "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion
+        exclude("com.typesafe.akka", "akka-stream_2.11"),
       "com.github.scribejava" % "scribejava-apis" % "2.4.0",
       "com.ning" % "async-http-client" % "1.9.33",
       "org.webjars" % "angularjs" % "1.4.9",
+      "org.apache.hadoop" % "hadoop-common" % hadoopVersion,
 
       // angular 1.5 breaks ui-select, but we need ng-touch 1.5
       "org.webjars.npm" % "angular-touch" % "1.5.0",
@@ -382,14 +390,17 @@ object Build extends sbt.Build {
   lazy val akkastream = Project(
     id = "gearpump-experiments-akkastream",
     base = file("experiments/akkastream"),
-    settings = commonSettings ++ noPublish ++ myAssemblySettings ++
+    settings = commonSettings ++ noPublish ++
       Seq(
         libraryDependencies ++= Seq(
-          "org.json4s" %% "json4s-jackson" % "3.2.11"
+          "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion,
+          "org.json4s" %% "json4s-jackson" % "3.2.11",
+          "org.scalatest" %% "scalatest" % scalaTestVersion % "test"
         ),
-        mainClass in(Compile, packageBin) := 
Some("akka.stream.gearpump.example.Test")
-      ))
-      .dependsOn(streaming % "test->test; provided", daemon % "test->test; 
provided")
+        dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % 
akkaStreamVersion
+      )) 
+      .dependsOn (services % "test->test; compile->compile", daemon % 
"test->test; compile->compile")
+      .disablePlugins(sbtassembly.AssemblyPlugin)
 
   lazy val storm = Project(
     id = "gearpump-experiments-storm",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/project/Pack.scala
----------------------------------------------------------------------
diff --git a/project/Pack.scala b/project/Pack.scala
index 1c87653..47d3064 100644
--- a/project/Pack.scala
+++ b/project/Pack.scala
@@ -69,7 +69,8 @@ object Pack extends sbt.Build {
           "worker" -> "org.apache.gearpump.cluster.main.Worker",
           "services" -> "org.apache.gearpump.services.main.Services",
           "yarnclient" -> "org.apache.gearpump.experiments.yarn.client.Client",
-          "storm" -> "org.apache.gearpump.experiments.storm.StormRunner"
+          "storm" -> "org.apache.gearpump.experiments.storm.StormRunner",
+          "akkastream" -> "org.apache.gearpump.akkastream.example.Test11"
         ),
         packJvmOpts := Map(
           "gear" -> Seq("-Djava.net.preferIPv4Stack=true", 
"-Dgearpump.home=${PROG_HOME}"),
@@ -109,7 +110,13 @@ object Pack extends sbt.Build {
           "storm" -> Seq(
             "-server",
             "-Djava.net.preferIPv4Stack=true",
-            "-Dgearpump.home=${PROG_HOME}")
+            "-Dgearpump.home=${PROG_HOME}"),
+
+          "akkastream" -> Seq(
+            "-server",
+            "-Djava.net.preferIPv4Stack=true",
+            "-Dgearpump.home=${PROG_HOME}",
+            "-Djava.rmi.server.hostname=localhost")
         ),
         packLibDir := Map(
           "lib" -> new ProjectsToPack(core.id, streaming.id),
@@ -141,13 +148,14 @@ object Pack extends sbt.Build {
           "worker" -> daemonClassPath,
           "services" -> serviceClassPath,
           "yarnclient" -> yarnClassPath,
-          "storm" -> stormClassPath
+          "storm" -> stormClassPath,
+          "akkstream" -> daemonClassPath
         ),
 
         packArchivePrefix := projectName + "-" + scalaBinaryVersion.value,
         packArchiveExcludes := Seq("integrationtest")
 
       )
-  ).dependsOn(core, streaming, services, yarn, storm).
+  ).dependsOn(core, streaming, services, yarn, storm, akkastream).
     disablePlugins(sbtassembly.AssemblyPlugin)
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/project/scalastyle_config.xml
----------------------------------------------------------------------
diff --git a/project/scalastyle_config.xml b/project/scalastyle_config.xml
new file mode 100644
index 0000000..1b0a838
--- /dev/null
+++ b/project/scalastyle_config.xml
@@ -0,0 +1,240 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<!--
+
+If you wish to turn off checking for a section of code, you can put a comment 
in the source
+before and after the section, with the following syntax:
+
+  // scalastyle:off
+  ...  // stuff that breaks the styles
+  // scalastyle:on
+
+You can also disable only one rule, by specifying its rule id, as specified in:
+  http://www.scalastyle.org/rules-0.8.0.html
+
+  // scalastyle:off no.finalize
+  override def finalize(): Unit = ...
+  // scalastyle:on no.finalize
+
+This file is divided into 3 sections:
+ (1) rules that we enforce.
+ (2) rules that we would like to enforce, but haven't cleaned up the codebase 
to turn on yet
+     (or we need to make the scalastyle rule more configurable).
+ (3) rules that we don't want to enforce.
+-->
+
+<scalastyle>
+  <name>Scalastyle standard configuration</name>
+
+  <check level="error" class="org.scalastyle.file.FileTabChecker" 
enabled="true"></check>
+
+  <check level="error" class="org.scalastyle.file.HeaderMatchesChecker" 
enabled="true">
+    <parameters>
+      <parameter name="header"><![CDATA[/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */]]></parameter>
+    </parameters>
+  </check>
+
+  <check level="error" 
class="org.scalastyle.scalariform.SpacesAfterPlusChecker"
+         enabled="true"></check>
+
+  <check level="error" 
class="org.scalastyle.scalariform.SpacesBeforePlusChecker"
+         enabled="true"></check>
+
+  <check level="error" class="org.scalastyle.file.FileLineLengthChecker" 
enabled="true">
+    <parameters>
+      <parameter name="maxLineLength"><![CDATA[100]]></parameter>
+      <parameter name="tabSize"><![CDATA[2]]></parameter>
+      <parameter name="ignoreImports">true</parameter>
+    </parameters>
+  </check>
+
+  <check level="error" class="org.scalastyle.scalariform.ClassNamesChecker" 
enabled="true">
+    <parameters>
+      <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+    </parameters>
+  </check>
+
+  <check level="error" class="org.scalastyle.scalariform.ObjectNamesChecker" 
enabled="true">
+    <parameters>
+      <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+    </parameters>
+  </check>
+
+  <check level="error" 
class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
+    <parameters>
+      <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
+    </parameters>
+  </check>
+
+  <check level="error" 
class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
+    <parameters>
+      <parameter name="maxParameters"><![CDATA[10]]></parameter>
+    </parameters>
+  </check>
+
+  <check level="error" class="org.scalastyle.scalariform.NoFinalizeChecker" 
enabled="true"></check>
+
+  <check level="error" 
class="org.scalastyle.scalariform.CovariantEqualsChecker"
+         enabled="true"></check>
+
+  <check level="error" class="org.scalastyle.scalariform.StructuralTypeChecker"
+         enabled="true"></check>
+
+  <check level="error" class="org.scalastyle.scalariform.UppercaseLChecker" 
enabled="true"></check>
+
+  <check level="error" class="org.scalastyle.scalariform.IfBraceChecker" 
enabled="true">
+    <parameters>
+      <parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
+      <parameter name="doubleLineAllowed"><![CDATA[true]]></parameter>
+    </parameters>
+  </check>
+
+  <check level="error" 
class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker"
+         enabled="true"></check>
+
+  <check level="error" 
class="org.scalastyle.scalariform.NonASCIICharacterChecker"
+         enabled="true"></check>
+
+  <check level="error" 
class="org.scalastyle.scalariform.SpaceAfterCommentStartChecker"
+         enabled="true"></check>
+
+  <check level="error" 
class="org.scalastyle.scalariform.EnsureSingleSpaceBeforeTokenChecker"
+         enabled="true">
+    <parameters>
+      <parameter name="tokens">ARROW, EQUALS, ELSE, TRY, CATCH, FINALLY, 
LARROW, RARROW</parameter>
+    </parameters>
+  </check>
+
+  <check level="error" 
class="org.scalastyle.scalariform.EnsureSingleSpaceAfterTokenChecker"
+         enabled="true">
+    <parameters>
+      <parameter name="tokens">ARROW, EQUALS, COMMA, COLON, IF, ELSE, DO, 
WHILE, FOR, MATCH, TRY,
+        CATCH, FINALLY, LARROW, RARROW
+      </parameter>
+    </parameters>
+  </check>
+
+  <!-- ??? usually shouldn't be checked into the code base. -->
+  <check level="error" 
class="org.scalastyle.scalariform.NotImplementedErrorUsage"
+         enabled="true"></check>
+
+  <check customId="println" level="error" 
class="org.scalastyle.scalariform.TokenChecker"
+         enabled="true">
+    <parameters>
+      <parameter name="regex">^println$</parameter>
+    </parameters>
+    <customMessage><![CDATA[Are you sure you want to println? If yes, wrap the 
code block with
+      // scalastyle:off println
+      println(...)
+      // scalastyle:on println]]></customMessage>
+  </check>
+
+  <check customId="runtimeaddshutdownhook" level="error" 
class="org.scalastyle.file.RegexChecker"
+         enabled="true">
+    <parameters>
+      <parameter name="regex">Runtime\.getRuntime\.addShutdownHook</parameter>
+    </parameters>
+    <customMessage><![CDATA[
+      Are you sure that you want to use Runtime.getRuntime.addShutdownHook? In 
most cases, you should use
+      ShutdownHookManager.addShutdownHook instead.
+      If you must use Runtime.getRuntime.addShutdownHook, wrap the code block 
with
+      // scalastyle:off runtimeaddshutdownhook
+      Runtime.getRuntime.addShutdownHook(...)
+      // scalastyle:on runtimeaddshutdownhook
+    ]]></customMessage>
+  </check>
+
+  <check customId="mutablesynchronizedbuffer" level="error" 
class="org.scalastyle.file.RegexChecker"
+         enabled="true">
+    <parameters>
+      <parameter name="regex">mutable\.SynchronizedBuffer</parameter>
+    </parameters>
+    <customMessage><![CDATA[
+      Are you sure that you want to use mutable.SynchronizedBuffer? In most 
cases, you should use
+      java.util.concurrent.ConcurrentLinkedQueue instead.
+      If you must use mutable.SynchronizedBuffer, wrap the code block with
+      // scalastyle:off mutablesynchronizedbuffer
+      mutable.SynchronizedBuffer[...]
+      // scalastyle:on mutablesynchronizedbuffer
+    ]]></customMessage>
+  </check>
+
+  <check customId="javaconversions" level="error" 
class="org.scalastyle.scalariform.TokenChecker"
+         enabled="true">
+    <parameters>
+      <parameter name="regex">JavaConversions</parameter>
+    </parameters>
+    <customMessage>Instead of importing implicits in 
scala.collection.JavaConversions._, import
+      scala.collection.JavaConverters._ and use .asScala / .asJava methods
+    </customMessage>
+  </check>
+
+  <check level="error" 
class="org.scalastyle.scalariform.DisallowSpaceBeforeTokenChecker"
+         enabled="true">
+    <parameters>
+      <parameter name="tokens">COMMA</parameter>
+    </parameters>
+  </check>
+
+  <!-- Should add single Space between ')' and '{' -->
+  <check customId="SingleSpaceBetweenRParenAndLCurlyBrace" level="error"
+         class="org.scalastyle.file.RegexChecker" enabled="true">
+    <parameters>
+      <parameter name="regex">\)\{</parameter>
+    </parameters>
+    <customMessage><![CDATA[
+      Single Space between ')' and `{`.
+    ]]></customMessage>
+  </check>
+
+  <check level="error" class="org.scalastyle.scalariform.MethodNamesChecker" 
enabled="false">
+    <parameters>
+      <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter>
+    </parameters>
+  </check>
+
+  <check level="error" class="org.scalastyle.scalariform.EqualsHashCodeChecker"
+         enabled="true"></check>
+
+  <check level="error" class="org.scalastyle.file.IndentationChecker" 
enabled="true">
+    <parameters>
+      <parameter name="tabSize">2</parameter>
+      <parameter name="methodParamIndentSize">4</parameter>
+    </parameters>
+  </check>
+
+  <!-- Don't allow return -->
+  <check level="error" class="org.scalastyle.scalariform.ReturnChecker" 
enabled="true"></check>
+
+</scalastyle>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
deleted file mode 100644
index 1b0a838..0000000
--- a/scalastyle-config.xml
+++ /dev/null
@@ -1,240 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~      http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<!--
-
-If you wish to turn off checking for a section of code, you can put a comment 
in the source
-before and after the section, with the following syntax:
-
-  // scalastyle:off
-  ...  // stuff that breaks the styles
-  // scalastyle:on
-
-You can also disable only one rule, by specifying its rule id, as specified in:
-  http://www.scalastyle.org/rules-0.8.0.html
-
-  // scalastyle:off no.finalize
-  override def finalize(): Unit = ...
-  // scalastyle:on no.finalize
-
-This file is divided into 3 sections:
- (1) rules that we enforce.
- (2) rules that we would like to enforce, but haven't cleaned up the codebase 
to turn on yet
-     (or we need to make the scalastyle rule more configurable).
- (3) rules that we don't want to enforce.
--->
-
-<scalastyle>
-  <name>Scalastyle standard configuration</name>
-
-  <check level="error" class="org.scalastyle.file.FileTabChecker" 
enabled="true"></check>
-
-  <check level="error" class="org.scalastyle.file.HeaderMatchesChecker" 
enabled="true">
-    <parameters>
-      <parameter name="header"><![CDATA[/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */]]></parameter>
-    </parameters>
-  </check>
-
-  <check level="error" 
class="org.scalastyle.scalariform.SpacesAfterPlusChecker"
-         enabled="true"></check>
-
-  <check level="error" 
class="org.scalastyle.scalariform.SpacesBeforePlusChecker"
-         enabled="true"></check>
-
-  <check level="error" class="org.scalastyle.file.FileLineLengthChecker" 
enabled="true">
-    <parameters>
-      <parameter name="maxLineLength"><![CDATA[100]]></parameter>
-      <parameter name="tabSize"><![CDATA[2]]></parameter>
-      <parameter name="ignoreImports">true</parameter>
-    </parameters>
-  </check>
-
-  <check level="error" class="org.scalastyle.scalariform.ClassNamesChecker" 
enabled="true">
-    <parameters>
-      <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
-    </parameters>
-  </check>
-
-  <check level="error" class="org.scalastyle.scalariform.ObjectNamesChecker" 
enabled="true">
-    <parameters>
-      <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
-    </parameters>
-  </check>
-
-  <check level="error" 
class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
-    <parameters>
-      <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
-    </parameters>
-  </check>
-
-  <check level="error" 
class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
-    <parameters>
-      <parameter name="maxParameters"><![CDATA[10]]></parameter>
-    </parameters>
-  </check>
-
-  <check level="error" class="org.scalastyle.scalariform.NoFinalizeChecker" 
enabled="true"></check>
-
-  <check level="error" 
class="org.scalastyle.scalariform.CovariantEqualsChecker"
-         enabled="true"></check>
-
-  <check level="error" class="org.scalastyle.scalariform.StructuralTypeChecker"
-         enabled="true"></check>
-
-  <check level="error" class="org.scalastyle.scalariform.UppercaseLChecker" 
enabled="true"></check>
-
-  <check level="error" class="org.scalastyle.scalariform.IfBraceChecker" 
enabled="true">
-    <parameters>
-      <parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
-      <parameter name="doubleLineAllowed"><![CDATA[true]]></parameter>
-    </parameters>
-  </check>
-
-  <check level="error" 
class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker"
-         enabled="true"></check>
-
-  <check level="error" 
class="org.scalastyle.scalariform.NonASCIICharacterChecker"
-         enabled="true"></check>
-
-  <check level="error" 
class="org.scalastyle.scalariform.SpaceAfterCommentStartChecker"
-         enabled="true"></check>
-
-  <check level="error" 
class="org.scalastyle.scalariform.EnsureSingleSpaceBeforeTokenChecker"
-         enabled="true">
-    <parameters>
-      <parameter name="tokens">ARROW, EQUALS, ELSE, TRY, CATCH, FINALLY, 
LARROW, RARROW</parameter>
-    </parameters>
-  </check>
-
-  <check level="error" 
class="org.scalastyle.scalariform.EnsureSingleSpaceAfterTokenChecker"
-         enabled="true">
-    <parameters>
-      <parameter name="tokens">ARROW, EQUALS, COMMA, COLON, IF, ELSE, DO, 
WHILE, FOR, MATCH, TRY,
-        CATCH, FINALLY, LARROW, RARROW
-      </parameter>
-    </parameters>
-  </check>
-
-  <!-- ??? usually shouldn't be checked into the code base. -->
-  <check level="error" 
class="org.scalastyle.scalariform.NotImplementedErrorUsage"
-         enabled="true"></check>
-
-  <check customId="println" level="error" 
class="org.scalastyle.scalariform.TokenChecker"
-         enabled="true">
-    <parameters>
-      <parameter name="regex">^println$</parameter>
-    </parameters>
-    <customMessage><![CDATA[Are you sure you want to println? If yes, wrap the 
code block with
-      // scalastyle:off println
-      println(...)
-      // scalastyle:on println]]></customMessage>
-  </check>
-
-  <check customId="runtimeaddshutdownhook" level="error" 
class="org.scalastyle.file.RegexChecker"
-         enabled="true">
-    <parameters>
-      <parameter name="regex">Runtime\.getRuntime\.addShutdownHook</parameter>
-    </parameters>
-    <customMessage><![CDATA[
-      Are you sure that you want to use Runtime.getRuntime.addShutdownHook? In 
most cases, you should use
-      ShutdownHookManager.addShutdownHook instead.
-      If you must use Runtime.getRuntime.addShutdownHook, wrap the code block 
with
-      // scalastyle:off runtimeaddshutdownhook
-      Runtime.getRuntime.addShutdownHook(...)
-      // scalastyle:on runtimeaddshutdownhook
-    ]]></customMessage>
-  </check>
-
-  <check customId="mutablesynchronizedbuffer" level="error" 
class="org.scalastyle.file.RegexChecker"
-         enabled="true">
-    <parameters>
-      <parameter name="regex">mutable\.SynchronizedBuffer</parameter>
-    </parameters>
-    <customMessage><![CDATA[
-      Are you sure that you want to use mutable.SynchronizedBuffer? In most 
cases, you should use
-      java.util.concurrent.ConcurrentLinkedQueue instead.
-      If you must use mutable.SynchronizedBuffer, wrap the code block with
-      // scalastyle:off mutablesynchronizedbuffer
-      mutable.SynchronizedBuffer[...]
-      // scalastyle:on mutablesynchronizedbuffer
-    ]]></customMessage>
-  </check>
-
-  <check customId="javaconversions" level="error" 
class="org.scalastyle.scalariform.TokenChecker"
-         enabled="true">
-    <parameters>
-      <parameter name="regex">JavaConversions</parameter>
-    </parameters>
-    <customMessage>Instead of importing implicits in 
scala.collection.JavaConversions._, import
-      scala.collection.JavaConverters._ and use .asScala / .asJava methods
-    </customMessage>
-  </check>
-
-  <check level="error" 
class="org.scalastyle.scalariform.DisallowSpaceBeforeTokenChecker"
-         enabled="true">
-    <parameters>
-      <parameter name="tokens">COMMA</parameter>
-    </parameters>
-  </check>
-
-  <!-- Should add single Space between ')' and '{' -->
-  <check customId="SingleSpaceBetweenRParenAndLCurlyBrace" level="error"
-         class="org.scalastyle.file.RegexChecker" enabled="true">
-    <parameters>
-      <parameter name="regex">\)\{</parameter>
-    </parameters>
-    <customMessage><![CDATA[
-      Single Space between ')' and `{`.
-    ]]></customMessage>
-  </check>
-
-  <check level="error" class="org.scalastyle.scalariform.MethodNamesChecker" 
enabled="false">
-    <parameters>
-      <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter>
-    </parameters>
-  </check>
-
-  <check level="error" class="org.scalastyle.scalariform.EqualsHashCodeChecker"
-         enabled="true"></check>
-
-  <check level="error" class="org.scalastyle.file.IndentationChecker" 
enabled="true">
-    <parameters>
-      <parameter name="tabSize">2</parameter>
-      <parameter name="methodParamIndentSize">4</parameter>
-    </parameters>
-  </check>
-
-  <!-- Don't allow return -->
-  <check level="error" class="org.scalastyle.scalariform.ReturnChecker" 
enabled="true"></check>
-
-</scalastyle>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
 
b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
index b217363..46e16cf 100644
--- 
a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
+++ 
b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
@@ -149,7 +149,7 @@ class AppMasterService(val master: ActorRef,
             }
           }
       } ~
-      path("metrics" / RestPath) { path =>
+      path("metrics" / RemainingPath) { path =>
         parameterMap { optionMap =>
           parameter("aggregator" ? "") { aggregator =>
             parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala 
b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
index bf7092e..a763be6 100644
--- 
a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
+++ 
b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
@@ -102,7 +102,7 @@ class MasterService(val master: ActorRef,
           failWith(ex)
       }
     } ~
-    path("metrics" / RestPath) { path =>
+    path("metrics" / RemainingPath) { path =>
       parameters(ParamMagnet(ReadOption.Key ? ReadOption.ReadLatest)) { 
readOption: String =>
         val query = QueryHistoryMetrics(path.head.toString, readOption)
         onComplete(askActor[HistoryMetrics](master, query)) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
 
b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
index 804b34f..8ae8dbe 100644
--- 
a/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
+++ 
b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
@@ -60,8 +60,8 @@ import org.apache.gearpump.services.util.UpickleUtil._
 class SecurityService(inner: RouteService, implicit val system: ActorSystem) 
extends RouteService {
 
   // Use scheme "GearpumpBasic" to avoid popping up web browser native 
authentication box.
-  private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = 
"gearpump",
-    params = Map.empty)
+  private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = 
Some("gearpump"),
+    params = Map.empty[String,String])
 
   val LOG = LogUtil.getLogger(getClass, "AUDIT")
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala 
b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
index 284d3f2..7b33987 100644
--- 
a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
+++ 
b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
@@ -19,10 +19,12 @@
 package org.apache.gearpump.services
 
 import akka.actor.ActorSystem
+import akka.http.scaladsl.marshalling.ToResponseMarshallable
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.marshalling.ToResponseMarshallable._
+import akka.http.scaladsl.server.{RejectionHandler, StandardRoute}
 import akka.stream.Materializer
-
 import org.apache.gearpump.util.Util
 // NOTE: This cannot be removed!!!
 import org.apache.gearpump.services.util.UpickleUtil._
@@ -56,14 +58,14 @@ class StaticService(override val system: ActorSystem, 
supervisorPath: String)
       getFromResource("index.html")
     } ~
     path("favicon.ico") {
-      complete(StatusCodes.NotFound)
+      complete(ToResponseMarshallable(StatusCodes.NotFound))
     } ~
     pathPrefix("webjars") {
       get {
         getFromResourceDirectory("META-INF/resources/webjars")
       }
     } ~
-    path(Rest) { path =>
+    path(Remaining) { path =>
       getFromResource("%s" format path)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala 
b/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
index 8268d61..954fe97 100644
--- 
a/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
+++ 
b/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
@@ -63,7 +63,7 @@ class WorkerService(val master: ActorRef, override val 
system: ActorSystem)
           failWith(ex)
       }
     } ~
-    path("metrics" / RestPath ) { path =>
+    path("metrics" / RemainingPath ) { path =>
       val workerId = WorkerId.parse(workerIdString)
       parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption =>
         val query = QueryHistoryMetrics(path.head.toString, readOption)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
index 8de291c..b09d9b9 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
@@ -64,7 +64,7 @@ class OpTranslator extends java.io.Serializable {
               userConfig)
           case ProcessorOp(processor, parallelism, conf, description) =>
             DefaultProcessor(parallelism,
-              description = description + "." + func.description,
+              description = description + " " + func.description,
               userConfig, processor)
           case DataSinkOp(dataSink, parallelism, conf, description) =>
             DataSinkProcessor(dataSink, parallelism, description + 
func.description)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index c0b6a29..eb52700 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -23,8 +23,6 @@ import java.util
 import java.util.concurrent.TimeUnit
 
 import akka.actor._
-import org.apache.gearpump.streaming.source.{Watermark, DataSourceTask}
-import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
 import 
org.apache.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap
 import org.apache.gearpump.metrics.Metrics
@@ -32,8 +30,10 @@ import org.apache.gearpump.serializer.SerializationFramework
 import org.apache.gearpump.streaming.AppMasterToExecutor._
 import org.apache.gearpump.streaming.ExecutorToAppMaster._
 import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.streaming.source.Watermark
 import org.apache.gearpump.util.{LogUtil, TimeOutScheduler}
 import org.apache.gearpump.{Message, TimeStamp}
+import org.slf4j.Logger
 
 /**
  *
@@ -52,16 +52,15 @@ class TaskActor(
 
   def serializerPool: SerializationFramework = inputSerializerPool
 
-  import taskContextData._
-
   import org.apache.gearpump.streaming.Constants._
   import org.apache.gearpump.streaming.task.TaskActor._
+  import taskContextData._
   val config = context.system.settings.config
 
   val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = 
executorId, task = taskId)
 
   // Metrics
-  private val metricName = 
s"app${appId}.processor${taskId.processorId}.task${taskId.index}"
+  private val metricName = 
s"app$appId.processor${taskId.processorId}.task${taskId.index}"
   private val receiveLatency = Metrics(context.system).histogram(
     s"$metricName:receiveLatency", sampleRate = 1)
   private val processTime = 
Metrics(context.system).histogram(s"$metricName:processTime")
@@ -76,9 +75,9 @@ class TaskActor(
   private var life = taskContextData.life
 
   // Latency probe
-  import scala.concurrent.duration._
-
   import context.dispatcher
+
+  import scala.concurrent.duration._
   final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS)
 
   // Clock report interval

Reply via email to