http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
 
b/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
new file mode 100644
index 0000000..610f0c7
--- /dev/null
+++ 
b/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services.security.oauth2
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpEntity.Strict
+import akka.http.scaladsl.model.MediaTypes._
+import akka.http.scaladsl.model.Uri.{Path, Query}
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.testkit.ScalatestRouteTest
+import com.typesafe.config.ConfigFactory
+import org.scalatest.FlatSpec
+
+import org.apache.gearpump.security.Authenticator
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+import 
org.apache.gearpump.services.security.oauth2.GoogleOAuth2AuthenticatorSpec.MockGoogleAuthenticator
+import 
org.apache.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator
+
+class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
+
+  implicit val actorSystem: ActorSystem = system
+  private val server = new MockOAuth2Server(system, null)
+  server.start()
+  private val serverHost = s"http://127.0.0.1:${server.port}";
+
+  val configMap = Map(
+    "class" -> 
"org.apache.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator",
+    "callback" -> s"$serverHost/login/oauth2/google/callback",
+    "clientid" -> 
"170234147043-a1tag68jtq6ab4bi11jvsj7vbaqcmhkt.apps.googleusercontent.com",
+    "clientsecret" -> "ioeWLLDipz2S7aTDXym2-obe",
+    "default-userrole" -> "guest",
+    "icon" -> "/icons/google.png")
+
+  val configString = ConfigFactory.parseMap(configMap.asJava)
+
+  private lazy val google = {
+    val google = new MockGoogleAuthenticator(serverHost)
+    google.init(configString, system.dispatcher)
+    google
+  }
+
+  it should "generate the correct authorization request" in {
+    val parameters = Uri(google.getAuthorizationUrl()).query().toMap
+    assert(parameters("response_type") == "code")
+    assert(parameters("client_id") == configMap("clientid"))
+    assert(parameters("redirect_uri") == configMap("callback"))
+    assert(parameters("scope") == GoogleOAuth2Authenticator.Scope)
+  }
+
+  it should "authenticate the authorization code and return the correct 
profile" in {
+    val code = Map("code" -> "4/PME0pfxjiBA42SukR-OTGl7fpFzTWzvZPf1TbkpXL4M#")
+    val accessToken = "e2922002-0218-4513-a62d-1da2ba64ee4c"
+    val refreshToken = "eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI2Nm"
+    val mail = "[email protected]"
+
+    def accessTokenEndpoint(request: HttpRequest): HttpResponse = {
+
+      assert(request.entity.contentType.mediaType.value == 
"application/x-www-form-urlencoded")
+
+      val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8")
+      val form = Uri./.withQuery(Query(body)).query().toMap
+
+      assert(form("client_id") == configMap("clientid"))
+      assert(form("client_secret") == configMap("clientsecret"))
+      assert(form("grant_type") == "authorization_code")
+      assert(form("code") == code("code"))
+      assert(form("redirect_uri") == configMap("callback"))
+      assert(form("scope") == GoogleOAuth2Authenticator.Scope)
+
+      // scalastyle:off line.size.limit
+      val response =
+        s"""
+        |{
+        | "access_token": "$accessToken",
+        | "token_type": "Bearer",
+        | "expires_in": 3591,
+        | "id_token": 
"eyJhbGciOiJSUzI1NiIsImtpZCI6ImY1NjQyYzY2MzdhYWQyOTJiOThlOGIwN2MwMzIxN2QwMzBmOTdkODkifQ.eyJpc3"
+        |}
+        """.stripMargin
+      // scalastyle:on line.size.limit
+
+      HttpResponse(entity = HttpEntity(ContentType(`application/json`), 
response))
+    }
+
+    def protectedResourceEndpoint(request: HttpRequest): HttpResponse = {
+      assert(request.getUri().query().get("access_token").get == accessToken)
+      val response =
+        s"""
+        |{
+        |   "kind": "plus#person",
+        |   "etag": "4OZ_Kt6ujOh1jaML_U6RM6APqoE/mZ57HcMOYXaNXYXS5XEGJ9yVsI8",
+        |   "nickname": "gearpump",
+        |   "gender": "female",
+        |   "emails": [
+        |     {
+        |       "value": "$mail",
+        |       "type": "account"
+        |     }
+        |   ]
+        | }
+        """.stripMargin
+      HttpResponse(entity = HttpEntity(ContentType(`application/json`), 
response))
+    }
+
+    server.requestHandler = (request: HttpRequest) => {
+      if (request.uri.path.startsWith(Path("/oauth2/v4/token"))) {
+        accessTokenEndpoint(request)
+      } else if (request.uri.path.startsWith(Path("/plus/v1/people/me"))) {
+        protectedResourceEndpoint(request)
+      } else {
+        fail("Unexpected access to " + request.uri.toString())
+      }
+    }
+
+    val userFuture = google.authenticate(code)
+    val user = Await.result(userFuture, 30.seconds)
+    assert(user.user == mail)
+    assert(user.permissionLevel == Authenticator.Guest.permissionLevel)
+  }
+
+  override def cleanUp(): Unit = {
+    server.stop()
+    google.close()
+    super.cleanUp()
+  }
+}
+
+object GoogleOAuth2AuthenticatorSpec {
+  class MockGoogleAuthenticator(host: String) extends 
GoogleOAuth2Authenticator {
+    protected override def authorizeUrl: String = {
+      super.authorizeUrl.replace("https://accounts.google.com";, host)
+    }
+
+    protected override def accessTokenEndpoint: String = {
+      super.accessTokenEndpoint.replace("https://www.googleapis.com";, host)
+    }
+
+    protected override def protectedResourceUrl: String = {
+      super.protectedResourceUrl.replace("https://www.googleapis.com";, host)
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/MockOAuth2Server.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/MockOAuth2Server.scala
 
b/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/MockOAuth2Server.scala
new file mode 100644
index 0000000..d656361
--- /dev/null
+++ 
b/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/MockOAuth2Server.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services.security.oauth2
+
+import scala.concurrent.{Await, Future}
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.Http.ServerBinding
+import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.Sink
+
+import org.apache.gearpump.util.Util
+// NOTE: This cannot be removed!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+/**
+ * Serves as a fake OAuth2 server.
+ */
+class MockOAuth2Server(
+    actorSystem: ActorSystem,
+    var requestHandler: HttpRequest => HttpResponse) {
+
+  implicit val system: ActorSystem = actorSystem
+  implicit val materializer = ActorMaterializer()
+  implicit val ec = system.dispatcher
+
+  private var _port: Int = 0
+  private var bindingFuture: Future[ServerBinding] = null
+
+  def port: Int = _port
+
+  def start(): Unit = {
+    _port = Util.findFreePort().get
+
+    val serverSource = Http().bind(interface = "127.0.0.1", port = _port)
+    bindingFuture = {
+      serverSource.to(Sink.foreach { connection =>
+        connection handleWithSyncHandler requestHandler
+      }).run()
+    }
+  }
+
+  def stop(): Unit = {
+    import scala.concurrent.duration._
+    Await.result(bindingFuture.map(_.unbind()), 120.seconds)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala
 
b/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala
new file mode 100644
index 0000000..2bf6b6b
--- /dev/null
+++ 
b/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services.util
+
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+import upickle.default.{read, write}
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.metrics.Metrics.{Counter, MetricType}
+import org.apache.gearpump.services.util.UpickleUtil._
+import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.streaming.appmaster.{ProcessorSummary, 
StreamAppMasterSummary}
+import org.apache.gearpump.util.Graph
+
+class UpickleSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
+
+  "UserConfig" should "serialize and deserialize with upickle correctly" in {
+    val conf = UserConfig.empty.withString("key", "value")
+    val serialized = write(conf)
+    val deserialized = read[UserConfig](serialized)
+    assert(deserialized.getString("key") == Some("value"))
+  }
+
+  "Graph" should "be able to serialize/deserialize correctly" in {
+    val graph = new Graph[Int, String](List(0, 1), List((0, "edge", 1)))
+    val serialized = write(graph)
+
+    val deserialized = read[Graph[Int, String]](serialized)
+
+    graph.vertices.toSet shouldBe deserialized.vertices.toSet
+    graph.edges.toSet shouldBe deserialized.edges.toSet
+  }
+
+  "MetricType" should "be able to serialize/deserialize correctly" in {
+    val metric: MetricType = Counter("counter", 100L)
+    val serialized = write(metric)
+    val deserialized = read[MetricType](serialized)
+    metric shouldBe deserialized
+  }
+
+  "StreamingAppMasterDataDetail" should "serialize and deserialize with 
upickle correctly" in {
+    val app = new StreamAppMasterSummary(appId = 0,
+      processors = Map.empty[ProcessorId, ProcessorSummary],
+      processorLevels = Map.empty[ProcessorId, Int]
+    )
+
+    val serialized = write(app)
+    val deserialized = read[StreamAppMasterSummary](serialized)
+    assert(deserialized == app)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java 
b/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java
deleted file mode 100644
index dbae1c4..0000000
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.javaapi;
-
-import io.gearpump.partitioner.Partitioner;
-import io.gearpump.streaming.Processor;
-import io.gearpump.streaming.task.Task;
-
-/**
- * Java version of Graph
- *
- * See {@link io.gearpump.util.Graph}
- */
-public class Graph extends io.gearpump.util.Graph<Processor<? extends Task>, 
Partitioner> {
-
-  public Graph() {
-    super(null, null);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java 
b/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java
deleted file mode 100644
index 974183e..0000000
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.javaapi;
-
-import akka.actor.ActorSystem;
-import io.gearpump.cluster.UserConfig;
-import io.gearpump.streaming.sink.DataSink;
-import io.gearpump.streaming.sink.DataSinkProcessor;
-import io.gearpump.streaming.sink.DataSinkTask;
-import io.gearpump.streaming.source.DataSource;
-import io.gearpump.streaming.source.DataSourceProcessor;
-import io.gearpump.streaming.source.DataSourceTask;
-
-/**
- * Java version of Processor
- *
- * See {@link io.gearpump.streaming.Processor}
- */
-public class Processor<T extends io.gearpump.streaming.task.Task> implements 
io.gearpump.streaming.Processor<T> {
-  private Class<T> _taskClass;
-  private int _parallelism = 1;
-  private String _description = "";
-  private UserConfig _userConf = UserConfig.empty();
-
-  public Processor(Class<T> taskClass) {
-    this._taskClass = taskClass;
-  }
-
-  public Processor(Class<T> taskClass, int parallelism) {
-    this._taskClass = taskClass;
-    this._parallelism = parallelism;
-  }
-
-  /**
-   * Creates a Sink Processor
-   *
-   * @param dataSink    the data sink itself
-   * @param parallelism the parallelism of this processor
-   * @param description the description for this processor
-   * @param taskConf    the configuration for this processor
-   * @param system      actor system
-   * @return the new created sink processor
-   */
-  public static Processor<DataSinkTask> sink(DataSink dataSink, int 
parallelism, String description, UserConfig taskConf, ActorSystem system) {
-    io.gearpump.streaming.Processor<DataSinkTask> p = 
DataSinkProcessor.apply(dataSink, parallelism, description, taskConf, system);
-    return new Processor(p);
-  }
-
-  /**
-   * Creates a Source Processor
-   *
-   * @param source      the data source itself
-   * @param parallelism the parallelism of this processor
-   * @param description the description of this processor
-   * @param taskConf    the configuration of this processor
-   * @param system      actor system
-   * @return the new created source processor
-   */
-  public static Processor<DataSourceTask> source(DataSource source, int 
parallelism, String description, UserConfig taskConf, ActorSystem system) {
-    io.gearpump.streaming.Processor<DataSourceTask> p = 
DataSourceProcessor.apply(source, parallelism, description, taskConf, system);
-    return new Processor(p);
-  }
-
-  public Processor(io.gearpump.streaming.Processor<T> processor) {
-    this._taskClass = (Class) (processor.taskClass());
-    this._parallelism = processor.parallelism();
-    this._description = processor.description();
-    this._userConf = processor.taskConf();
-  }
-
-  /**
-   * Creates a general processor with user specified task logic.
-   *
-   * @param taskClass    task implementation class of this processor (shall be 
a derived class from {@link Task}
-   * @param parallelism, how many initial tasks you want to use
-   * @param description, some text to describe this processor
-   * @param taskConf,    Processor specific configuration
-   */
-  public Processor(Class<T> taskClass, int parallelism, String description, 
UserConfig taskConf) {
-    this._taskClass = taskClass;
-    this._parallelism = parallelism;
-    this._description = description;
-    this._userConf = taskConf;
-  }
-
-  public Processor<T> withParallelism(int parallel) {
-    return new Processor<T>(_taskClass, parallel, _description, _userConf);
-  }
-
-  public Processor<T> withDescription(String desc) {
-    return new Processor<T>(_taskClass, _parallelism, desc, _userConf);
-  }
-
-  public Processor<T> withConfig(UserConfig conf) {
-    return new Processor<T>(_taskClass, _parallelism, _description, conf);
-  }
-
-  @Override
-  public int parallelism() {
-    return _parallelism;
-  }
-
-  @Override
-  public UserConfig taskConf() {
-    return _userConf;
-  }
-
-  @Override
-  public String description() {
-    return _description;
-  }
-
-  @Override
-  public Class<? extends io.gearpump.streaming.task.Task> taskClass() {
-    return _taskClass;
-  }
-
-  /**
-   * reference equal
-   */
-  @Override
-  public boolean equals(Object obj) {
-    return (this == obj);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java 
b/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java
deleted file mode 100644
index 150a26f..0000000
--- 
a/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.javaapi;
-
-import akka.actor.ActorSystem;
-import io.gearpump.cluster.Application;
-import io.gearpump.cluster.ApplicationMaster;
-import io.gearpump.cluster.UserConfig;
-
-/**
- * Java version of StreamApplication.
- *
- * Also see {@link io.gearpump.streaming.StreamApplication}
- */
-public class StreamApplication implements Application {
-  private io.gearpump.streaming.StreamApplication app;
-  /**
-   * Creates a streaming application
-   *
-   * @param name  Name of the application
-   * @param conf  User configuration
-   * @param graph The DAG
-   */
-  public StreamApplication(String name, UserConfig conf, Graph graph) {
-    //by pass the tricky type check in scala 2.10
-    io.gearpump.util.Graph untypedGraph = graph;
-    this.app = io.gearpump.streaming.StreamApplication.apply(
-      name, untypedGraph, conf);
-  }
-
-  @Override
-  public String name() {
-    return app.name();
-  }
-
-  @Override
-  public UserConfig userConfig(ActorSystem system) {
-    return app.userConfig(system);
-  }
-
-  @Override
-  public Class<? extends ApplicationMaster> appMaster() {
-    return app.appMaster();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java 
b/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java
deleted file mode 100644
index 45fae19..0000000
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.javaapi;
-
-import akka.actor.ActorRef;
-import io.gearpump.Message;
-import io.gearpump.cluster.UserConfig;
-import io.gearpump.streaming.task.StartTime;
-import io.gearpump.streaming.task.TaskContext;
-
-/**
- * Java version of Task.
- *
- * See {@link io.gearpump.streaming.task.Task}
- */
-public class Task extends io.gearpump.streaming.task.Task {
-  protected TaskContext context;
-  protected UserConfig userConf;
-
-  public Task(TaskContext taskContext, UserConfig userConf) {
-    super(taskContext, userConf);
-    this.context = taskContext;
-    this.userConf = userConf;
-  }
-
-  @Override
-  final public ActorRef self() {
-    return context.self();
-  }
-
-  @Override
-  public void onStart(StartTime startTime) {
-  }
-
-  @Override
-  public void onNext(Message msg) {
-  }
-
-  @Override
-  public void onStop() {
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
 
b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
deleted file mode 100644
index bb97442..0000000
--- 
a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-
-/**
- * Filter function
- *
- * @param <T> Message of type T
- */
-public interface FilterFunction<T> extends Serializable {
-  boolean apply(T t);
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
 
b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
deleted file mode 100644
index 3e18cf1..0000000
--- 
a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-/**
- * Function that converts a value of type T to a iterator of values of type R.
- *
- * @param <T> Input value type
- * @param <R> Return value type
- */
-public interface FlatMapFunction<T, R> extends Serializable {
-  Iterator<R> apply(T t);
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
 
b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
deleted file mode 100644
index 2ba524e..0000000
--- 
a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-
-/**
- * GroupBy function which assign value of type T to groups
- *
- * @param <T> Input value type
- * @param <Group> Group Type
- */
-public interface GroupByFunction<T, Group> extends Serializable {
-  Group apply(T t);
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
 
b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
deleted file mode 100644
index b4bd6ac..0000000
--- 
a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-
-/**
- * Function that map a value of type T to value of type R
- *
- * @param <T> Input value type
- * @param <R> Output value type
- */
-public interface MapFunction<T, R> extends Serializable {
-  R apply(T t);
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
 
b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
deleted file mode 100644
index f439c0a..0000000
--- 
a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-
-/**
- * Function that applies reduce operation
- *
- * @param <T> Input value type
- */
-public interface ReduceFunction<T> extends Serializable {
-  T apply(T t1, T t2);
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java
new file mode 100644
index 0000000..8f85aa3
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.javaapi;
+
+import org.apache.gearpump.partitioner.Partitioner;
+import org.apache.gearpump.streaming.Processor;
+import org.apache.gearpump.streaming.task.Task;
+
+/**
+ * Java version of Graph
+ *
+ * See {@link org.apache.gearpump.util.Graph}
+ */
+public class Graph extends org.apache.gearpump.util.Graph<Processor<? extends 
Task>, Partitioner> {
+
+  public Graph() {
+    super(null, null);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java
new file mode 100644
index 0000000..8757081
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.javaapi;
+
+import akka.actor.ActorSystem;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.streaming.sink.DataSink;
+import org.apache.gearpump.streaming.sink.DataSinkProcessor;
+import org.apache.gearpump.streaming.sink.DataSinkTask;
+import org.apache.gearpump.streaming.source.DataSource;
+import org.apache.gearpump.streaming.source.DataSourceProcessor;
+import org.apache.gearpump.streaming.source.DataSourceTask;
+
+/**
+ * Java version of Processor
+ *
+ * See {@link org.apache.gearpump.streaming.Processor}
+ */
+public class Processor<T extends org.apache.gearpump.streaming.task.Task> 
implements org.apache.gearpump.streaming.Processor<T> {
+  private Class<T> _taskClass;
+  private int _parallelism = 1;
+  private String _description = "";
+  private UserConfig _userConf = UserConfig.empty();
+
+  public Processor(Class<T> taskClass) {
+    this._taskClass = taskClass;
+  }
+
+  public Processor(Class<T> taskClass, int parallelism) {
+    this._taskClass = taskClass;
+    this._parallelism = parallelism;
+  }
+
+  /**
+   * Creates a Sink Processor
+   *
+   * @param dataSink    the data sink itself
+   * @param parallelism the parallelism of this processor
+   * @param description the description for this processor
+   * @param taskConf    the configuration for this processor
+   * @param system      actor system
+   * @return the new created sink processor
+   */
+  public static Processor<DataSinkTask> sink(DataSink dataSink, int 
parallelism, String description, UserConfig taskConf, ActorSystem system) {
+    org.apache.gearpump.streaming.Processor<DataSinkTask> p = 
DataSinkProcessor.apply(dataSink, parallelism, description, taskConf, system);
+    return new Processor(p);
+  }
+
+  /**
+   * Creates a Source Processor
+   *
+   * @param source      the data source itself
+   * @param parallelism the parallelism of this processor
+   * @param description the description of this processor
+   * @param taskConf    the configuration of this processor
+   * @param system      actor system
+   * @return the new created source processor
+   */
+  public static Processor<DataSourceTask> source(DataSource source, int 
parallelism, String description, UserConfig taskConf, ActorSystem system) {
+    org.apache.gearpump.streaming.Processor<DataSourceTask> p = 
DataSourceProcessor.apply(source, parallelism, description, taskConf, system);
+    return new Processor(p);
+  }
+
+  public Processor(org.apache.gearpump.streaming.Processor<T> processor) {
+    this._taskClass = (Class) (processor.taskClass());
+    this._parallelism = processor.parallelism();
+    this._description = processor.description();
+    this._userConf = processor.taskConf();
+  }
+
+  /**
+   * Creates a general processor with user specified task logic.
+   *
+   * @param taskClass    task implementation class of this processor (shall be 
a derived class from {@link Task}
+   * @param parallelism, how many initial tasks you want to use
+   * @param description, some text to describe this processor
+   * @param taskConf,    Processor specific configuration
+   */
+  public Processor(Class<T> taskClass, int parallelism, String description, 
UserConfig taskConf) {
+    this._taskClass = taskClass;
+    this._parallelism = parallelism;
+    this._description = description;
+    this._userConf = taskConf;
+  }
+
+  public Processor<T> withParallelism(int parallel) {
+    return new Processor<T>(_taskClass, parallel, _description, _userConf);
+  }
+
+  public Processor<T> withDescription(String desc) {
+    return new Processor<T>(_taskClass, _parallelism, desc, _userConf);
+  }
+
+  public Processor<T> withConfig(UserConfig conf) {
+    return new Processor<T>(_taskClass, _parallelism, _description, conf);
+  }
+
+  @Override
+  public int parallelism() {
+    return _parallelism;
+  }
+
+  @Override
+  public UserConfig taskConf() {
+    return _userConf;
+  }
+
+  @Override
+  public String description() {
+    return _description;
+  }
+
+  @Override
+  public Class<? extends org.apache.gearpump.streaming.task.Task> taskClass() {
+    return _taskClass;
+  }
+
+  /**
+   * reference equal
+   */
+  @Override
+  public boolean equals(Object obj) {
+    return (this == obj);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/StreamApplication.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/StreamApplication.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/StreamApplication.java
new file mode 100644
index 0000000..b15685e
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/StreamApplication.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.javaapi;
+
+import akka.actor.ActorSystem;
+import org.apache.gearpump.cluster.Application;
+import org.apache.gearpump.cluster.ApplicationMaster;
+import org.apache.gearpump.cluster.UserConfig;
+
+/**
+ * Java version of StreamApplication.
+ *
+ * Also see {@link org.apache.gearpump.streaming.StreamApplication}
+ */
+public class StreamApplication implements Application {
+  private org.apache.gearpump.streaming.StreamApplication app;
+  /**
+   * Creates a streaming application
+   *
+   * @param name  Name of the application
+   * @param conf  User configuration
+   * @param graph The DAG
+   */
+  public StreamApplication(String name, UserConfig conf, Graph graph) {
+    //by pass the tricky type check in scala 2.10
+    org.apache.gearpump.util.Graph untypedGraph = graph;
+    this.app = org.apache.gearpump.streaming.StreamApplication.apply(
+      name, untypedGraph, conf);
+  }
+
+  @Override
+  public String name() {
+    return app.name();
+  }
+
+  @Override
+  public UserConfig userConfig(ActorSystem system) {
+    return app.userConfig(system);
+  }
+
+  @Override
+  public Class<? extends ApplicationMaster> appMaster() {
+    return app.appMaster();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java
new file mode 100644
index 0000000..2efce45
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.javaapi;
+
+import akka.actor.ActorRef;
+import org.apache.gearpump.Message;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.streaming.task.StartTime;
+import org.apache.gearpump.streaming.task.TaskContext;
+
+/**
+ * Java version of Task.
+ *
+ * See {@link org.apache.gearpump.streaming.task.Task}
+ */
+public class Task extends org.apache.gearpump.streaming.task.Task {
+  protected TaskContext context;
+  protected UserConfig userConf;
+
+  public Task(TaskContext taskContext, UserConfig userConf) {
+    super(taskContext, userConf);
+    this.context = taskContext;
+    this.userConf = userConf;
+  }
+
+  @Override
+  final public ActorRef self() {
+    return context.self();
+  }
+
+  @Override
+  public void onStart(StartTime startTime) {
+  }
+
+  @Override
+  public void onNext(Message msg) {
+  }
+
+  @Override
+  public void onStop() {
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
new file mode 100644
index 0000000..f07ceff
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.javaapi.dsl.functions;
+
+import java.io.Serializable;
+
+/**
+ * Filter function
+ *
+ * @param <T> Message of type T
+ */
+public interface FilterFunction<T> extends Serializable {
+  boolean apply(T t);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
new file mode 100644
index 0000000..9788dd2
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.javaapi.dsl.functions;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * Function that converts a value of type T to a iterator of values of type R.
+ *
+ * @param <T> Input value type
+ * @param <R> Return value type
+ */
+public interface FlatMapFunction<T, R> extends Serializable {
+  Iterator<R> apply(T t);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
new file mode 100644
index 0000000..6c71280
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.javaapi.dsl.functions;
+
+import java.io.Serializable;
+
+/**
+ * GroupBy function which assign value of type T to groups
+ *
+ * @param <T> Input value type
+ * @param <Group> Group Type
+ */
+public interface GroupByFunction<T, Group> extends Serializable {
+  Group apply(T t);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
new file mode 100644
index 0000000..e1fc821
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.javaapi.dsl.functions;
+
+import java.io.Serializable;
+
+/**
+ * Function that map a value of type T to value of type R
+ *
+ * @param <T> Input value type
+ * @param <R> Output value type
+ */
+public interface MapFunction<T, R> extends Serializable {
+  R apply(T t);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
new file mode 100644
index 0000000..2bcac60
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.javaapi.dsl.functions;
+
+import java.io.Serializable;
+
+/**
+ * Function that applies reduce operation
+ *
+ * @param <T> Input value type
+ */
+public interface ReduceFunction<T> extends Serializable {
+  T apply(T t1, T t2);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/streaming/src/main/resources/geardefault.conf 
b/streaming/src/main/resources/geardefault.conf
index 219543e..a484f3c 100644
--- a/streaming/src/main/resources/geardefault.conf
+++ b/streaming/src/main/resources/geardefault.conf
@@ -21,11 +21,11 @@ gearpump {
       ### Whitelist for Metrics Aggregator class.
       ### See class [[MetricsAggregator]] for more information.
       metrics-aggregator-class {
-        "io.gearpump.streaming.metrics.TaskFilterAggregator" = ""
-        "io.gearpump.streaming.metrics.ProcessorAggregator" = ""
+        "org.apache.gearpump.streaming.metrics.TaskFilterAggregator" = ""
+        "org.apache.gearpump.streaming.metrics.ProcessorAggregator" = ""
       }
     }
   }
 
-  transport.serializer = 
"io.gearpump.streaming.task.StreamingTransportSerializer"
+  transport.serializer = 
"org.apache.gearpump.streaming.task.StreamingTransportSerializer"
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala 
b/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala
deleted file mode 100644
index 06cf022..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming
-
-import scala.language.existentials
-
-import akka.actor.ActorRef
-
-import io.gearpump.TimeStamp
-import io.gearpump.cluster.appmaster.WorkerInfo
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.streaming.appmaster.TaskRegistry.TaskLocations
-import io.gearpump.streaming.task.{Subscriber, TaskId}
-import io.gearpump.transport.HostPort
-
-object AppMasterToExecutor {
-  case class LaunchTasks(
-      taskId: List[TaskId], dagVersion: Int, processorDescription: 
ProcessorDescription,
-      subscribers: List[Subscriber])
-
-  case object TasksLaunched
-
-  /**
-   * dagVersion, life, and subscribers will be changed on target task list.
-   */
-  case class ChangeTasks(
-      taskId: List[TaskId], dagVersion: Int, life: LifeTime, subscribers: 
List[Subscriber])
-
-  case class TasksChanged(taskIds: List[TaskId])
-
-  case class ChangeTask(
-      taskId: TaskId, dagVersion: Int, life: LifeTime, subscribers: 
List[Subscriber])
-
-  case class TaskChanged(taskId: TaskId, dagVersion: Int)
-
-  case class StartTask(taskId: TaskId)
-
-  case class StopTask(taskId: TaskId)
-
-  case class TaskLocationsReady(taskLocations: TaskLocations, dagVersion: Int)
-
-  case class TaskLocationsReceived(dagVersion: Int, executorId: ExecutorId)
-
-  case class TaskLocationsRejected(
-      dagVersion: Int, executorId: ExecutorId, reason: String, ex: Throwable)
-
-  case class StartAllTasks(dagVersion: Int)
-
-  case class StartDynamicDag(dagVersion: Int)
-  case class TaskRegistered(taskId: TaskId, sessionId: Int, startClock: 
TimeStamp)
-  case class TaskRejected(taskId: TaskId)
-
-  case object RestartClockService
-  class MsgLostException extends Exception
-}
-
-object ExecutorToAppMaster {
-  case class RegisterExecutor(
-      executor: ActorRef, executorId: Int, resource: Resource, worker : 
WorkerInfo)
-
-  case class RegisterTask(taskId: TaskId, executorId: Int, task: HostPort)
-  case class UnRegisterTask(taskId: TaskId, executorId: Int)
-
-  case class MessageLoss(executorId: Int, taskId: TaskId, cause: String)
-}
-
-object AppMasterToMaster {
-  case class StallingTasks(tasks: List[TaskId])
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/Constants.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/Constants.scala 
b/streaming/src/main/scala/io/gearpump/streaming/Constants.scala
deleted file mode 100644
index becf31a..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/Constants.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming
-
-object Constants {
-
-  val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator"
-  val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.dsl.source"
-  val GEARPUMP_STREAMING_SINK = "gearpump.streaming.dsl.sink"
-  val GEARPUMP_STREAMING_GROUPBY_FUNCTION = 
"gearpump.streaming.dsl.groupby-function"
-
-  val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities"
-
-  val GEARPUMP_STREAMING_REGISTER_TASK_TIMEOUT_MS = 
"gearpump.streaming.register-task-timeout-ms"
-
-  val GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT =
-    "gearpump.streaming.max-pending-message-count-per-connection"
-
-  val GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT =
-    "gearpump.streaming.ack-once-every-message-count"
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/DAG.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/DAG.scala 
b/streaming/src/main/scala/io/gearpump/streaming/DAG.scala
deleted file mode 100644
index a73fd48..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/DAG.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming
-
-import io.gearpump.partitioner.PartitionerDescription
-import io.gearpump.streaming.task.TaskId
-import io.gearpump.util.Graph
-
-/**
- * DAG is wrapper for [[io.gearpump.util.Graph]] for streaming applications.
- */
-case class DAG(version: Int, processors : Map[ProcessorId, 
ProcessorDescription],
-    graph : Graph[ProcessorId, PartitionerDescription])
-  extends Serializable {
-
-  def isEmpty: Boolean = {
-    processors.isEmpty
-  }
-
-  def taskCount: Int = {
-    processors.foldLeft(0) { (count, task) =>
-      count + task._2.parallelism
-    }
-  }
-
-  def tasks: List[TaskId] = {
-    processors.flatMap { pair =>
-      val (processorId, processor) = pair
-      (0 until processor.parallelism).map(TaskId(processorId, _))
-    }.toList
-  }
-}
-
-object DAG {
-  def apply(graph: Graph[ProcessorDescription, PartitionerDescription], 
version: Int = 0): DAG = {
-    val processors = graph.vertices.map { processorDescription =>
-      (processorDescription.id, processorDescription)
-    }.toMap
-    val dag = graph.mapVertex { processor =>
-      processor.id
-    }
-    new DAG(version, processors, dag)
-  }
-
-  def empty: DAG = apply(Graph.empty[ProcessorDescription, 
PartitionerDescription])
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala 
b/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala
deleted file mode 100644
index 8eb866d..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming
-
-import java.io.{DataInput, DataOutput}
-
-import io.gearpump.streaming.task._
-
-class TaskIdSerializer extends TaskMessageSerializer[TaskId] {
-  override def getLength(obj: TaskId): Int = 8
-
-  override def write(dataOutput: DataOutput, obj: TaskId): Unit = {
-    dataOutput.writeInt(obj.processorId)
-    dataOutput.writeInt(obj.index)
-  }
-
-  override def read(dataInput: DataInput): TaskId = {
-    val processorId = dataInput.readInt()
-    val index = dataInput.readInt()
-    new TaskId(processorId, index)
-  }
-}
-
-class AckSerializer extends TaskMessageSerializer[Ack] {
-  val taskIdSerializer = new TaskIdSerializer
-
-  override def getLength(obj: Ack): Int = 
taskIdSerializer.getLength(obj.taskId) + 8
-
-  override def write(dataOutput: DataOutput, obj: Ack): Unit = {
-    taskIdSerializer.write(dataOutput, obj.taskId)
-    dataOutput.writeShort(obj.seq)
-    dataOutput.writeShort(obj.actualReceivedNum)
-    dataOutput.writeInt(obj.sessionId)
-  }
-
-  override def read(dataInput: DataInput): Ack = {
-    val taskId = taskIdSerializer.read(dataInput)
-    val seq = dataInput.readShort()
-    val actualReceivedNum = dataInput.readShort()
-    val sessionId = dataInput.readInt()
-    Ack(taskId, seq, actualReceivedNum, sessionId)
-  }
-}
-
-class InitialAckRequestSerializer extends 
TaskMessageSerializer[InitialAckRequest] {
-  val taskIdSerialzer = new TaskIdSerializer()
-
-  override def getLength(obj: InitialAckRequest): Int = 
taskIdSerialzer.getLength(obj.taskId) + 4
-
-  override def write(dataOutput: DataOutput, obj: InitialAckRequest): Unit = {
-    taskIdSerialzer.write(dataOutput, obj.taskId)
-    dataOutput.writeInt(obj.sessionId)
-  }
-
-  override def read(dataInput: DataInput): InitialAckRequest = {
-    val taskId = taskIdSerialzer.read(dataInput)
-    val sessionId = dataInput.readInt()
-    InitialAckRequest(taskId, sessionId)
-  }
-}
-
-class AckRequestSerializer extends TaskMessageSerializer[AckRequest] {
-  val taskIdSerializer = new TaskIdSerializer
-
-  override def getLength(obj: AckRequest): Int = 
taskIdSerializer.getLength(obj.taskId) + 6
-
-  override def write(dataOutput: DataOutput, obj: AckRequest): Unit = {
-    taskIdSerializer.write(dataOutput, obj.taskId)
-    dataOutput.writeShort(obj.seq)
-    dataOutput.writeInt(obj.sessionId)
-  }
-
-  override def read(dataInput: DataInput): AckRequest = {
-    val taskId = taskIdSerializer.read(dataInput)
-    val seq = dataInput.readShort()
-    val sessionId = dataInput.readInt()
-    AckRequest(taskId, seq, sessionId)
-  }
-}
-
-class LatencyProbeSerializer extends TaskMessageSerializer[LatencyProbe] {
-  override def getLength(obj: LatencyProbe): Int = 8
-
-  override def write(dataOutput: DataOutput, obj: LatencyProbe): Unit = {
-    dataOutput.writeLong(obj.timestamp)
-  }
-
-  override def read(dataInput: DataInput): LatencyProbe = {
-    val timestamp = dataInput.readLong()
-    LatencyProbe(timestamp)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala 
b/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala
deleted file mode 100644
index 0ca4d92..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming
-
-import scala.language.implicitConversions
-import scala.reflect.ClassTag
-
-import akka.actor.ActorSystem
-
-import io.gearpump.TimeStamp
-import io.gearpump.cluster._
-import io.gearpump.partitioner.{HashPartitioner, Partitioner, 
PartitionerDescription, PartitionerObject}
-import io.gearpump.streaming.appmaster.AppMaster
-import io.gearpump.streaming.task.Task
-import io.gearpump.util.{Graph, LogUtil, ReferenceEqual}
-
-/**
- * Processor is the blueprint for tasks.
- */
-trait Processor[+T <: Task] extends ReferenceEqual {
-
-  /**
-   * How many tasks you want to use for this processor.
-   */
-  def parallelism: Int
-
-  /**
-   * The custom [[io.gearpump.cluster.UserConfig]], it is used to initialize a 
task in runtime.
-   */
-  def taskConf: UserConfig
-
-  /**
-   * Some description text for this processor.
-   */
-  def description: String
-
-  /**
-   * The task class, should be a subtype of Task.
-   *
-   * Each runtime instance of this class is a task.
-   */
-  def taskClass: Class[_ <: Task]
-}
-
-object Processor {
-  def ProcessorToProcessorDescription(id: ProcessorId, processor: Processor[_ 
<: Task])
-    : ProcessorDescription = {
-    import processor._
-    ProcessorDescription(id, taskClass.getName, parallelism, description, 
taskConf)
-  }
-
-  def apply[T<: Task](
-      parallelism : Int, description: String = "",
-      taskConf: UserConfig = UserConfig.empty)(implicit classtag: ClassTag[T])
-    : DefaultProcessor[T] = {
-    new DefaultProcessor[T](parallelism, description, taskConf,
-      classtag.runtimeClass.asInstanceOf[Class[T]])
-  }
-
-  def apply[T<: Task](
-      taskClazz: Class[T], parallelism : Int, description: String, taskConf: 
UserConfig)
-    : DefaultProcessor[T] = {
-    new DefaultProcessor[T](parallelism, description, taskConf, taskClazz)
-  }
-
-  case class DefaultProcessor[T<: Task](
-      parallelism : Int, description: String, taskConf: UserConfig, taskClass: 
Class[T])
-    extends Processor[T] {
-
-    def withParallelism(parallel: Int): DefaultProcessor[T] = {
-      new DefaultProcessor[T](parallel, description, taskConf, taskClass)
-    }
-
-    def withDescription(desc: String): DefaultProcessor[T] = {
-      new DefaultProcessor[T](parallelism, desc, taskConf, taskClass)
-    }
-
-    def withConfig(conf: UserConfig): DefaultProcessor[T] = {
-      new DefaultProcessor[T](parallelism, description, conf, taskClass)
-    }
-  }
-}
-
-/**
- * Each processor has a LifeTime.
- *
- * When input message's timestamp is beyond current processor's lifetime,
- * then it will not be processed by this processor.
- */
-case class LifeTime(birth: TimeStamp, death: TimeStamp) {
-  def contains(timestamp: TimeStamp): Boolean = {
-    timestamp >= birth && timestamp < death
-  }
-
-  def cross(another: LifeTime): LifeTime = {
-    LifeTime(Math.max(birth, another.birth), Math.min(death, another.death))
-  }
-}
-
-object LifeTime {
-  val Immortal = LifeTime(0L, Long.MaxValue)
-}
-
-/**
- * Represent a streaming application
- */
-class StreamApplication(
-    override val name: String, val inputUserConfig: UserConfig,
-    val dag: Graph[ProcessorDescription, PartitionerDescription])
-  extends Application {
-
-  require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges")
-
-  override def appMaster: Class[_ <: ApplicationMaster] = classOf[AppMaster]
-  override def userConfig(implicit system: ActorSystem): UserConfig = {
-    inputUserConfig.withValue(StreamApplication.DAG, dag)
-  }
-}
-
-case class ProcessorDescription(
-    id: ProcessorId,
-    taskClass: String,
-    parallelism : Int,
-    description: String = "",
-    taskConf: UserConfig = null,
-    life: LifeTime = LifeTime.Immortal,
-    jar: AppJar = null) extends ReferenceEqual
-
-object StreamApplication {
-
-  private val hashPartitioner = new HashPartitioner()
-  private val LOG = LogUtil.getLogger(getClass)
-
-  def apply[T <: Processor[Task], P <: Partitioner](
-      name: String, dag: Graph[T, P], userConfig: UserConfig): 
StreamApplication = {
-    import io.gearpump.streaming.Processor._
-
-    if (dag.hasCycle()) {
-      LOG.warn(s"Detected cycles in DAG of application $name!")
-    }
-
-    val indices = 
dag.topologicalOrderWithCirclesIterator.toList.zipWithIndex.toMap
-    val graph = dag.mapVertex { processor =>
-      val updatedProcessor = 
ProcessorToProcessorDescription(indices(processor), processor)
-      updatedProcessor
-    }.mapEdge { (node1, edge, node2) =>
-      PartitionerDescription(new PartitionerObject(
-        Option(edge).getOrElse(StreamApplication.hashPartitioner)))
-    }
-    new StreamApplication(name, userConfig, graph)
-  }
-
-  val DAG = "DAG"
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala 
b/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
deleted file mode 100644
index df85017..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import java.lang.management.ManagementFactory
-import scala.concurrent.Future
-
-import akka.actor._
-import org.slf4j.Logger
-
-import io.gearpump._
-import io.gearpump.cluster.ClientToMaster.{GetLastFailure, GetStallingTasks, 
QueryHistoryMetrics, ShutdownApplication}
-import io.gearpump.cluster.MasterToAppMaster.{AppMasterDataDetailRequest, 
ReplayFromTimestampWindowTrailingEdge}
-import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, 
LastFailure}
-import io.gearpump.cluster._
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.metrics.Metrics.ReportMetrics
-import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
-import io.gearpump.partitioner.PartitionerDescription
-import io.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, 
RegisterExecutor, RegisterTask, UnRegisterTask}
-import io.gearpump.streaming._
-import io.gearpump.streaming.appmaster.AppMaster._
-import io.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, LatestDAG, 
ReplaceProcessor}
-import io.gearpump.streaming.appmaster.ExecutorManager.{ExecutorInfo, 
GetExecutorInfo}
-import io.gearpump.streaming.appmaster.TaskManager.{FailedToRecover, 
GetTaskList, TaskList}
-import io.gearpump.streaming.executor.Executor.{ExecutorConfig, 
ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
-import io.gearpump.streaming.storage.InMemoryAppStoreOnMaster
-import io.gearpump.streaming.task._
-import io.gearpump.streaming.util.ActorPathUtil
-import io.gearpump.util.Constants.{APPMASTER_DEFAULT_EXECUTOR_ID, _}
-import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-import io.gearpump.util._
-
-/**
- * AppMaster is the head of a streaming application.
- *
- * It contains:
- *  1. ExecutorManager to manage all executors.
- *  2. TaskManager to manage all tasks,
- *  3. ClockService to track the global clock for this streaming application.
- *  4. Scheduler to decide which a task should be scheduled to.
- */
-class AppMaster(appContext: AppMasterContext, app: AppDescription) extends 
ApplicationMaster {
-  import app.userConfig
-  import appContext.{appId, masterProxy, username}
-
-  private implicit val actorSystem = context.system
-  private implicit val timeOut = FUTURE_TIMEOUT
-
-  import akka.pattern.ask
-  private implicit val dispatcher = context.dispatcher
-
-  private val startTime: TimeStamp = System.currentTimeMillis()
-
-  private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
-  LOG.info(s"AppMaster[$appId] is launched by $username, app: $app 
xxxxxxxxxxxxxxxxx")
-  LOG.info(s"AppMaster actor path: ${ActorUtil.getFullPath(context.system, 
self.path)}")
-
-  private val address = ActorUtil.getFullPath(context.system, self.path)
-
-  private val store = new InMemoryAppStoreOnMaster(appId, 
appContext.masterProxy)
-  private val dagManager = context.actorOf(Props(new 
DagManager(appContext.appId, userConfig, store,
-    Some(getUpdatedDAG()))))
-
-  private var taskManager: Option[ActorRef] = None
-  private var clockService: Option[ActorRef] = None
-  private val systemConfig = context.system.settings.config
-  private var lastFailure = LastFailure(0L, null)
-
-  private val appMasterBrief = ExecutorBrief(APPMASTER_DEFAULT_EXECUTOR_ID,
-    self.path.toString,
-    
Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified), 
"active")
-
-  private val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
-
-  private val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
-
-  private val userDir = System.getProperty("user.dir")
-  private val logFile = LogUtil.applicationLogDir(actorSystem.settings.config)
-
-  private val appMasterExecutorSummary = ExecutorSummary(
-    APPMASTER_DEFAULT_EXECUTOR_ID,
-    
Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified),
-    self.path.toString,
-    logFile.getAbsolutePath,
-    status = "Active",
-    taskCount = 0,
-    tasks = Map.empty[ProcessorId, List[TaskId]],
-    jvmName = ManagementFactory.getRuntimeMXBean().getName()
-  )
-
-  private val historyMetricsService = if (metricsEnabled) {
-    // Registers jvm metrics
-    Metrics(context.system).register(new JvmMetricsSet(
-      s"app${appId}.executor${APPMASTER_DEFAULT_EXECUTOR_ID}"))
-
-    val historyMetricsService = context.actorOf(Props(new 
HistoryMetricsService(
-      s"app$appId", getHistoryMetricsConfig)))
-
-    val metricsReportService = context.actorOf(Props(
-      new MetricsReporterService(Metrics(context.system))))
-    historyMetricsService.tell(ReportMetrics, metricsReportService)
-
-    Some(historyMetricsService)
-  } else {
-    None
-  }
-
-  private val executorManager: ActorRef =
-    context.actorOf(ExecutorManager.props(userConfig, appContext, 
app.clusterConfig, app.name),
-      ActorPathUtil.executorManagerActorName)
-
-  for (dag <- getDAG) {
-    clockService = Some(context.actorOf(Props(new ClockService(dag, store))))
-    val jarScheduler = new JarScheduler(appId, app.name, systemConfig, context)
-
-    taskManager = Some(context.actorOf(Props(new TaskManager(appContext.appId, 
dagManager,
-      jarScheduler, executorManager, clockService.get, self, app.name))))
-  }
-
-  override def receive: Receive = {
-    taskMessageHandler orElse
-      executorMessageHandler orElse
-      recover orElse
-      appMasterService orElse
-      ActorUtil.defaultMsgHandler(self)
-  }
-
-  /** Handles messages from Tasks */
-  def taskMessageHandler: Receive = {
-    case clock: ClockEvent =>
-      taskManager.foreach(_ forward clock)
-    case register: RegisterTask =>
-      taskManager.foreach(_ forward register)
-    case unRegister: UnRegisterTask =>
-      taskManager.foreach(_ forward unRegister)
-      // Checks whether this processor dead, if it is, then we should remove 
it from clockService.
-      clockService.foreach(_ forward 
CheckProcessorDeath(unRegister.taskId.processorId))
-    case replay: ReplayFromTimestampWindowTrailingEdge =>
-      taskManager.foreach(_ forward replay)
-    case messageLoss: MessageLoss =>
-      lastFailure = LastFailure(System.currentTimeMillis(), messageLoss.cause)
-      taskManager.foreach(_ forward messageLoss)
-    case lookupTask: LookupTaskActorRef =>
-      taskManager.foreach(_ forward lookupTask)
-    case checkpoint: ReportCheckpointClock =>
-      clockService.foreach(_ forward checkpoint)
-    case GetDAG =>
-      val task = sender
-      getDAG.foreach {
-        dag => task ! dag
-      }
-    case GetCheckpointClock =>
-      clockService.foreach(_ forward GetCheckpointClock)
-  }
-
-  /** Handles messages from Executors */
-  def executorMessageHandler: Receive = {
-    case register: RegisterExecutor =>
-      executorManager forward register
-    case ReportMetrics =>
-      historyMetricsService.foreach(_ forward ReportMetrics)
-  }
-
-  /** Handles messages from AppMaster */
-  def appMasterService: Receive = {
-    case appMasterDataDetailRequest: AppMasterDataDetailRequest =>
-      LOG.debug(s"AppMaster got AppMasterDataDetailRequest for $appId ")
-
-      val executorsFuture = executorBrief
-      val clockFuture = getMinClock
-      val taskFuture = getTaskList
-      val dagFuture = getDAG
-
-      val appMasterDataDetail = for {
-        executors <- executorsFuture
-        clock <- clockFuture
-        tasks <- taskFuture
-        dag <- dagFuture
-      } yield {
-        val graph = dag.graph
-
-        val executorToTasks = tasks.tasks.groupBy(_._2).mapValues {
-          _.keys.toList
-        }
-
-        val processors = dag.processors.map { kv =>
-          val processor = kv._2
-          import processor._
-          val tasks = executorToTasks.map { kv =>
-            (kv._1, TaskCount(kv._2.count(_.processorId == id)))
-          }.filter(_._2.count != 0)
-          (id,
-            ProcessorSummary(id, taskClass, parallelism, description, 
taskConf, life,
-              tasks.keys.toList, tasks))
-        }
-
-        StreamAppMasterSummary(
-          appId = appId,
-          appName = app.name,
-          actorPath = address,
-          clock = clock,
-          status = MasterToAppMaster.AppMasterActive,
-          startTime = startTime,
-          uptime = System.currentTimeMillis() - startTime,
-          user = username,
-          homeDirectory = userDir,
-          logFile = logFile.getAbsolutePath,
-          processors = processors,
-          processorLevels = graph.vertexHierarchyLevelMap(),
-          dag = graph.mapEdge { (node1, edge, node2) =>
-            edge.partitionerFactory.name
-          },
-          executors = executors,
-          historyMetricsConfig = getHistoryMetricsConfig
-        )
-      }
-
-      val client = sender()
-
-      appMasterDataDetail.map { appData =>
-        client ! appData
-      }
-    // TODO: WebSocket is buggy and disabled.
-    //    case appMasterMetricsRequest: AppMasterMetricsRequest =>
-    //      val client = sender()
-    //      actorSystem.eventStream.subscribe(client, classOf[MetricType])
-    case query: QueryHistoryMetrics =>
-      if (historyMetricsService.isEmpty) {
-        // Returns empty metrics so that we don't hang the UI
-        sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
-      } else {
-        historyMetricsService.get forward query
-      }
-    case getStalling: GetStallingTasks =>
-      clockService.foreach(_ forward getStalling)
-    case replaceDAG: ReplaceProcessor =>
-      dagManager forward replaceDAG
-    case GetLastFailure(_) =>
-      sender ! lastFailure
-    case get@GetExecutorSummary(executorId) =>
-      val client = sender
-      if (executorId == APPMASTER_DEFAULT_EXECUTOR_ID) {
-        client ! appMasterExecutorSummary
-      } else {
-        ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, 
GetExecutorInfo)
-          .map { map =>
-            map.get(executorId).foreach { executor =>
-              executor.executor.tell(get, client)
-            }
-          }
-      }
-    case query@QueryExecutorConfig(executorId) =>
-      val client = sender
-      if (executorId == -1) {
-        val systemConfig = context.system.settings.config
-        sender ! 
ExecutorConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
-      } else {
-        ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, 
GetExecutorInfo)
-          .map { map =>
-            map.get(executorId).foreach { executor =>
-              executor.executor.tell(query, client)
-            }
-          }
-      }
-  }
-
-  /** Error handling */
-  def recover: Receive = {
-    case FailedToRecover(errorMsg) =>
-      if (context.children.toList.contains(sender())) {
-        LOG.error(errorMsg)
-        masterProxy ! ShutdownApplication(appId)
-      }
-    case AllocateResourceTimeOut =>
-      LOG.error(s"Failed to allocate resource in time, shutdown application 
$appId")
-      masterProxy ! ShutdownApplication(appId)
-      context.stop(self)
-  }
-
-  private def getMinClock: Future[TimeStamp] = {
-    clockService match {
-      case Some(clockService) =>
-        (clockService ? 
GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map(_.clock)
-      case None =>
-        Future.failed(new ServiceNotAvailableException("clock service not 
ready"))
-    }
-  }
-
-  private def executorBrief: Future[List[ExecutorBrief]] = {
-    ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, 
GetExecutorInfo)
-      .map { infos =>
-        infos.values.map { info =>
-          ExecutorBrief(info.executorId,
-            info.executor.path.toSerializationFormat,
-            info.worker.workerId,
-            "active")
-        }.toList :+ appMasterBrief
-      }
-  }
-
-  private def getTaskList: Future[TaskList] = {
-    taskManager match {
-      case Some(taskManager) =>
-        (taskManager ? GetTaskList).asInstanceOf[Future[TaskList]]
-      case None =>
-        Future.failed(new ServiceNotAvailableException("task manager not 
ready"))
-    }
-  }
-
-  private def getDAG: Future[DAG] = {
-    (dagManager ? GetLatestDAG).asInstanceOf[Future[LatestDAG]].map(_.dag)
-  }
-
-  private def getUpdatedDAG(): DAG = {
-    val dag = DAG(userConfig.getValue[Graph[ProcessorDescription,
-      PartitionerDescription]](StreamApplication.DAG).get)
-    val updated = dag.processors.map { idAndProcessor =>
-      val (id, oldProcessor) = idAndProcessor
-      val newProcessor = if (oldProcessor.jar == null) {
-        oldProcessor.copy(jar = appContext.appJar.getOrElse(null))
-      } else {
-        oldProcessor
-      }
-      (id, newProcessor)
-    }
-    DAG(dag.version, updated, dag.graph)
-  }
-}
-
-object AppMaster {
-
-  /** Master node doesn't return resource in time */
-  case object AllocateResourceTimeOut
-
-  /** Query task ActorRef by providing the taskId */
-  case class LookupTaskActorRef(taskId: TaskId)
-
-  case class TaskActorRef(task: ActorRef)
-
-  class ServiceNotAvailableException(reason: String) extends Exception(reason)
-
-  case class ExecutorBrief(
-      executorId: ExecutorId, executor: String, workerId: WorkerId, status: 
String)
-
-}
\ No newline at end of file


Reply via email to