http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala new file mode 100644 index 0000000..610f0c7 --- /dev/null +++ b/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services.security.oauth2 + +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.http.scaladsl.model.HttpEntity.Strict +import akka.http.scaladsl.model.MediaTypes._ +import akka.http.scaladsl.model.Uri.{Path, Query} +import akka.http.scaladsl.model._ +import akka.http.scaladsl.testkit.ScalatestRouteTest +import com.typesafe.config.ConfigFactory +import org.scalatest.FlatSpec + +import org.apache.gearpump.security.Authenticator +// NOTE: This cannot be removed!!! +import org.apache.gearpump.services.util.UpickleUtil._ +import org.apache.gearpump.services.security.oauth2.GoogleOAuth2AuthenticatorSpec.MockGoogleAuthenticator +import org.apache.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator + +class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest { + + implicit val actorSystem: ActorSystem = system + private val server = new MockOAuth2Server(system, null) + server.start() + private val serverHost = s"http://127.0.0.1:${server.port}" + + val configMap = Map( + "class" -> "org.apache.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator", + "callback" -> s"$serverHost/login/oauth2/google/callback", + "clientid" -> "170234147043-a1tag68jtq6ab4bi11jvsj7vbaqcmhkt.apps.googleusercontent.com", + "clientsecret" -> "ioeWLLDipz2S7aTDXym2-obe", + "default-userrole" -> "guest", + "icon" -> "/icons/google.png") + + val configString = ConfigFactory.parseMap(configMap.asJava) + + private lazy val google = { + val google = new MockGoogleAuthenticator(serverHost) + google.init(configString, system.dispatcher) + google + } + + it should "generate the correct authorization request" in { + val parameters = Uri(google.getAuthorizationUrl()).query().toMap + assert(parameters("response_type") == "code") + assert(parameters("client_id") == configMap("clientid")) + assert(parameters("redirect_uri") == configMap("callback")) + assert(parameters("scope") == GoogleOAuth2Authenticator.Scope) + } + + it should "authenticate the authorization code and return the correct profile" in { + val code = Map("code" -> "4/PME0pfxjiBA42SukR-OTGl7fpFzTWzvZPf1TbkpXL4M#") + val accessToken = "e2922002-0218-4513-a62d-1da2ba64ee4c" + val refreshToken = "eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI2Nm" + val mail = "[email protected]" + + def accessTokenEndpoint(request: HttpRequest): HttpResponse = { + + assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded") + + val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8") + val form = Uri./.withQuery(Query(body)).query().toMap + + assert(form("client_id") == configMap("clientid")) + assert(form("client_secret") == configMap("clientsecret")) + assert(form("grant_type") == "authorization_code") + assert(form("code") == code("code")) + assert(form("redirect_uri") == configMap("callback")) + assert(form("scope") == GoogleOAuth2Authenticator.Scope) + + // scalastyle:off line.size.limit + val response = + s""" + |{ + | "access_token": "$accessToken", + | "token_type": "Bearer", + | "expires_in": 3591, + | "id_token": "eyJhbGciOiJSUzI1NiIsImtpZCI6ImY1NjQyYzY2MzdhYWQyOTJiOThlOGIwN2MwMzIxN2QwMzBmOTdkODkifQ.eyJpc3" + |} + """.stripMargin + // scalastyle:on line.size.limit + + HttpResponse(entity = HttpEntity(ContentType(`application/json`), response)) + } + + def protectedResourceEndpoint(request: HttpRequest): HttpResponse = { + assert(request.getUri().query().get("access_token").get == accessToken) + val response = + s""" + |{ + | "kind": "plus#person", + | "etag": "4OZ_Kt6ujOh1jaML_U6RM6APqoE/mZ57HcMOYXaNXYXS5XEGJ9yVsI8", + | "nickname": "gearpump", + | "gender": "female", + | "emails": [ + | { + | "value": "$mail", + | "type": "account" + | } + | ] + | } + """.stripMargin + HttpResponse(entity = HttpEntity(ContentType(`application/json`), response)) + } + + server.requestHandler = (request: HttpRequest) => { + if (request.uri.path.startsWith(Path("/oauth2/v4/token"))) { + accessTokenEndpoint(request) + } else if (request.uri.path.startsWith(Path("/plus/v1/people/me"))) { + protectedResourceEndpoint(request) + } else { + fail("Unexpected access to " + request.uri.toString()) + } + } + + val userFuture = google.authenticate(code) + val user = Await.result(userFuture, 30.seconds) + assert(user.user == mail) + assert(user.permissionLevel == Authenticator.Guest.permissionLevel) + } + + override def cleanUp(): Unit = { + server.stop() + google.close() + super.cleanUp() + } +} + +object GoogleOAuth2AuthenticatorSpec { + class MockGoogleAuthenticator(host: String) extends GoogleOAuth2Authenticator { + protected override def authorizeUrl: String = { + super.authorizeUrl.replace("https://accounts.google.com", host) + } + + protected override def accessTokenEndpoint: String = { + super.accessTokenEndpoint.replace("https://www.googleapis.com", host) + } + + protected override def protectedResourceUrl: String = { + super.protectedResourceUrl.replace("https://www.googleapis.com", host) + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/MockOAuth2Server.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/MockOAuth2Server.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/MockOAuth2Server.scala new file mode 100644 index 0000000..d656361 --- /dev/null +++ b/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/MockOAuth2Server.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services.security.oauth2 + +import scala.concurrent.{Await, Future} + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.Http.ServerBinding +import akka.http.scaladsl.model.{HttpRequest, HttpResponse} +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.Sink + +import org.apache.gearpump.util.Util +// NOTE: This cannot be removed!! +import org.apache.gearpump.services.util.UpickleUtil._ + +/** + * Serves as a fake OAuth2 server. + */ +class MockOAuth2Server( + actorSystem: ActorSystem, + var requestHandler: HttpRequest => HttpResponse) { + + implicit val system: ActorSystem = actorSystem + implicit val materializer = ActorMaterializer() + implicit val ec = system.dispatcher + + private var _port: Int = 0 + private var bindingFuture: Future[ServerBinding] = null + + def port: Int = _port + + def start(): Unit = { + _port = Util.findFreePort().get + + val serverSource = Http().bind(interface = "127.0.0.1", port = _port) + bindingFuture = { + serverSource.to(Sink.foreach { connection => + connection handleWithSyncHandler requestHandler + }).run() + } + } + + def stop(): Unit = { + import scala.concurrent.duration._ + Await.result(bindingFuture.map(_.unbind()), 120.seconds) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala new file mode 100644 index 0000000..2bf6b6b --- /dev/null +++ b/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services.util + +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} +import upickle.default.{read, write} + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.metrics.Metrics.{Counter, MetricType} +import org.apache.gearpump.services.util.UpickleUtil._ +import org.apache.gearpump.streaming.ProcessorId +import org.apache.gearpump.streaming.appmaster.{ProcessorSummary, StreamAppMasterSummary} +import org.apache.gearpump.util.Graph + +class UpickleSpec extends FlatSpec with Matchers with BeforeAndAfterEach { + + "UserConfig" should "serialize and deserialize with upickle correctly" in { + val conf = UserConfig.empty.withString("key", "value") + val serialized = write(conf) + val deserialized = read[UserConfig](serialized) + assert(deserialized.getString("key") == Some("value")) + } + + "Graph" should "be able to serialize/deserialize correctly" in { + val graph = new Graph[Int, String](List(0, 1), List((0, "edge", 1))) + val serialized = write(graph) + + val deserialized = read[Graph[Int, String]](serialized) + + graph.vertices.toSet shouldBe deserialized.vertices.toSet + graph.edges.toSet shouldBe deserialized.edges.toSet + } + + "MetricType" should "be able to serialize/deserialize correctly" in { + val metric: MetricType = Counter("counter", 100L) + val serialized = write(metric) + val deserialized = read[MetricType](serialized) + metric shouldBe deserialized + } + + "StreamingAppMasterDataDetail" should "serialize and deserialize with upickle correctly" in { + val app = new StreamAppMasterSummary(appId = 0, + processors = Map.empty[ProcessorId, ProcessorSummary], + processorLevels = Map.empty[ProcessorId, Int] + ) + + val serialized = write(app) + val deserialized = read[StreamAppMasterSummary](serialized) + assert(deserialized == app) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java deleted file mode 100644 index dbae1c4..0000000 --- a/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.javaapi; - -import io.gearpump.partitioner.Partitioner; -import io.gearpump.streaming.Processor; -import io.gearpump.streaming.task.Task; - -/** - * Java version of Graph - * - * See {@link io.gearpump.util.Graph} - */ -public class Graph extends io.gearpump.util.Graph<Processor<? extends Task>, Partitioner> { - - public Graph() { - super(null, null); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java deleted file mode 100644 index 974183e..0000000 --- a/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.javaapi; - -import akka.actor.ActorSystem; -import io.gearpump.cluster.UserConfig; -import io.gearpump.streaming.sink.DataSink; -import io.gearpump.streaming.sink.DataSinkProcessor; -import io.gearpump.streaming.sink.DataSinkTask; -import io.gearpump.streaming.source.DataSource; -import io.gearpump.streaming.source.DataSourceProcessor; -import io.gearpump.streaming.source.DataSourceTask; - -/** - * Java version of Processor - * - * See {@link io.gearpump.streaming.Processor} - */ -public class Processor<T extends io.gearpump.streaming.task.Task> implements io.gearpump.streaming.Processor<T> { - private Class<T> _taskClass; - private int _parallelism = 1; - private String _description = ""; - private UserConfig _userConf = UserConfig.empty(); - - public Processor(Class<T> taskClass) { - this._taskClass = taskClass; - } - - public Processor(Class<T> taskClass, int parallelism) { - this._taskClass = taskClass; - this._parallelism = parallelism; - } - - /** - * Creates a Sink Processor - * - * @param dataSink the data sink itself - * @param parallelism the parallelism of this processor - * @param description the description for this processor - * @param taskConf the configuration for this processor - * @param system actor system - * @return the new created sink processor - */ - public static Processor<DataSinkTask> sink(DataSink dataSink, int parallelism, String description, UserConfig taskConf, ActorSystem system) { - io.gearpump.streaming.Processor<DataSinkTask> p = DataSinkProcessor.apply(dataSink, parallelism, description, taskConf, system); - return new Processor(p); - } - - /** - * Creates a Source Processor - * - * @param source the data source itself - * @param parallelism the parallelism of this processor - * @param description the description of this processor - * @param taskConf the configuration of this processor - * @param system actor system - * @return the new created source processor - */ - public static Processor<DataSourceTask> source(DataSource source, int parallelism, String description, UserConfig taskConf, ActorSystem system) { - io.gearpump.streaming.Processor<DataSourceTask> p = DataSourceProcessor.apply(source, parallelism, description, taskConf, system); - return new Processor(p); - } - - public Processor(io.gearpump.streaming.Processor<T> processor) { - this._taskClass = (Class) (processor.taskClass()); - this._parallelism = processor.parallelism(); - this._description = processor.description(); - this._userConf = processor.taskConf(); - } - - /** - * Creates a general processor with user specified task logic. - * - * @param taskClass task implementation class of this processor (shall be a derived class from {@link Task} - * @param parallelism, how many initial tasks you want to use - * @param description, some text to describe this processor - * @param taskConf, Processor specific configuration - */ - public Processor(Class<T> taskClass, int parallelism, String description, UserConfig taskConf) { - this._taskClass = taskClass; - this._parallelism = parallelism; - this._description = description; - this._userConf = taskConf; - } - - public Processor<T> withParallelism(int parallel) { - return new Processor<T>(_taskClass, parallel, _description, _userConf); - } - - public Processor<T> withDescription(String desc) { - return new Processor<T>(_taskClass, _parallelism, desc, _userConf); - } - - public Processor<T> withConfig(UserConfig conf) { - return new Processor<T>(_taskClass, _parallelism, _description, conf); - } - - @Override - public int parallelism() { - return _parallelism; - } - - @Override - public UserConfig taskConf() { - return _userConf; - } - - @Override - public String description() { - return _description; - } - - @Override - public Class<? extends io.gearpump.streaming.task.Task> taskClass() { - return _taskClass; - } - - /** - * reference equal - */ - @Override - public boolean equals(Object obj) { - return (this == obj); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java deleted file mode 100644 index 150a26f..0000000 --- a/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.javaapi; - -import akka.actor.ActorSystem; -import io.gearpump.cluster.Application; -import io.gearpump.cluster.ApplicationMaster; -import io.gearpump.cluster.UserConfig; - -/** - * Java version of StreamApplication. - * - * Also see {@link io.gearpump.streaming.StreamApplication} - */ -public class StreamApplication implements Application { - private io.gearpump.streaming.StreamApplication app; - /** - * Creates a streaming application - * - * @param name Name of the application - * @param conf User configuration - * @param graph The DAG - */ - public StreamApplication(String name, UserConfig conf, Graph graph) { - //by pass the tricky type check in scala 2.10 - io.gearpump.util.Graph untypedGraph = graph; - this.app = io.gearpump.streaming.StreamApplication.apply( - name, untypedGraph, conf); - } - - @Override - public String name() { - return app.name(); - } - - @Override - public UserConfig userConfig(ActorSystem system) { - return app.userConfig(system); - } - - @Override - public Class<? extends ApplicationMaster> appMaster() { - return app.appMaster(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java deleted file mode 100644 index 45fae19..0000000 --- a/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.javaapi; - -import akka.actor.ActorRef; -import io.gearpump.Message; -import io.gearpump.cluster.UserConfig; -import io.gearpump.streaming.task.StartTime; -import io.gearpump.streaming.task.TaskContext; - -/** - * Java version of Task. - * - * See {@link io.gearpump.streaming.task.Task} - */ -public class Task extends io.gearpump.streaming.task.Task { - protected TaskContext context; - protected UserConfig userConf; - - public Task(TaskContext taskContext, UserConfig userConf) { - super(taskContext, userConf); - this.context = taskContext; - this.userConf = userConf; - } - - @Override - final public ActorRef self() { - return context.self(); - } - - @Override - public void onStart(StartTime startTime) { - } - - @Override - public void onNext(Message msg) { - } - - @Override - public void onStop() { - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java deleted file mode 100644 index bb97442..0000000 --- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; - -/** - * Filter function - * - * @param <T> Message of type T - */ -public interface FilterFunction<T> extends Serializable { - boolean apply(T t); -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java deleted file mode 100644 index 3e18cf1..0000000 --- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; -import java.util.Iterator; - -/** - * Function that converts a value of type T to a iterator of values of type R. - * - * @param <T> Input value type - * @param <R> Return value type - */ -public interface FlatMapFunction<T, R> extends Serializable { - Iterator<R> apply(T t); -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java deleted file mode 100644 index 2ba524e..0000000 --- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; - -/** - * GroupBy function which assign value of type T to groups - * - * @param <T> Input value type - * @param <Group> Group Type - */ -public interface GroupByFunction<T, Group> extends Serializable { - Group apply(T t); -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java deleted file mode 100644 index b4bd6ac..0000000 --- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; - -/** - * Function that map a value of type T to value of type R - * - * @param <T> Input value type - * @param <R> Output value type - */ -public interface MapFunction<T, R> extends Serializable { - R apply(T t); -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java deleted file mode 100644 index f439c0a..0000000 --- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; - -/** - * Function that applies reduce operation - * - * @param <T> Input value type - */ -public interface ReduceFunction<T> extends Serializable { - T apply(T t1, T t2); -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java new file mode 100644 index 0000000..8f85aa3 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.javaapi; + +import org.apache.gearpump.partitioner.Partitioner; +import org.apache.gearpump.streaming.Processor; +import org.apache.gearpump.streaming.task.Task; + +/** + * Java version of Graph + * + * See {@link org.apache.gearpump.util.Graph} + */ +public class Graph extends org.apache.gearpump.util.Graph<Processor<? extends Task>, Partitioner> { + + public Graph() { + super(null, null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java new file mode 100644 index 0000000..8757081 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.javaapi; + +import akka.actor.ActorSystem; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.streaming.sink.DataSink; +import org.apache.gearpump.streaming.sink.DataSinkProcessor; +import org.apache.gearpump.streaming.sink.DataSinkTask; +import org.apache.gearpump.streaming.source.DataSource; +import org.apache.gearpump.streaming.source.DataSourceProcessor; +import org.apache.gearpump.streaming.source.DataSourceTask; + +/** + * Java version of Processor + * + * See {@link org.apache.gearpump.streaming.Processor} + */ +public class Processor<T extends org.apache.gearpump.streaming.task.Task> implements org.apache.gearpump.streaming.Processor<T> { + private Class<T> _taskClass; + private int _parallelism = 1; + private String _description = ""; + private UserConfig _userConf = UserConfig.empty(); + + public Processor(Class<T> taskClass) { + this._taskClass = taskClass; + } + + public Processor(Class<T> taskClass, int parallelism) { + this._taskClass = taskClass; + this._parallelism = parallelism; + } + + /** + * Creates a Sink Processor + * + * @param dataSink the data sink itself + * @param parallelism the parallelism of this processor + * @param description the description for this processor + * @param taskConf the configuration for this processor + * @param system actor system + * @return the new created sink processor + */ + public static Processor<DataSinkTask> sink(DataSink dataSink, int parallelism, String description, UserConfig taskConf, ActorSystem system) { + org.apache.gearpump.streaming.Processor<DataSinkTask> p = DataSinkProcessor.apply(dataSink, parallelism, description, taskConf, system); + return new Processor(p); + } + + /** + * Creates a Source Processor + * + * @param source the data source itself + * @param parallelism the parallelism of this processor + * @param description the description of this processor + * @param taskConf the configuration of this processor + * @param system actor system + * @return the new created source processor + */ + public static Processor<DataSourceTask> source(DataSource source, int parallelism, String description, UserConfig taskConf, ActorSystem system) { + org.apache.gearpump.streaming.Processor<DataSourceTask> p = DataSourceProcessor.apply(source, parallelism, description, taskConf, system); + return new Processor(p); + } + + public Processor(org.apache.gearpump.streaming.Processor<T> processor) { + this._taskClass = (Class) (processor.taskClass()); + this._parallelism = processor.parallelism(); + this._description = processor.description(); + this._userConf = processor.taskConf(); + } + + /** + * Creates a general processor with user specified task logic. + * + * @param taskClass task implementation class of this processor (shall be a derived class from {@link Task} + * @param parallelism, how many initial tasks you want to use + * @param description, some text to describe this processor + * @param taskConf, Processor specific configuration + */ + public Processor(Class<T> taskClass, int parallelism, String description, UserConfig taskConf) { + this._taskClass = taskClass; + this._parallelism = parallelism; + this._description = description; + this._userConf = taskConf; + } + + public Processor<T> withParallelism(int parallel) { + return new Processor<T>(_taskClass, parallel, _description, _userConf); + } + + public Processor<T> withDescription(String desc) { + return new Processor<T>(_taskClass, _parallelism, desc, _userConf); + } + + public Processor<T> withConfig(UserConfig conf) { + return new Processor<T>(_taskClass, _parallelism, _description, conf); + } + + @Override + public int parallelism() { + return _parallelism; + } + + @Override + public UserConfig taskConf() { + return _userConf; + } + + @Override + public String description() { + return _description; + } + + @Override + public Class<? extends org.apache.gearpump.streaming.task.Task> taskClass() { + return _taskClass; + } + + /** + * reference equal + */ + @Override + public boolean equals(Object obj) { + return (this == obj); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/StreamApplication.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/StreamApplication.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/StreamApplication.java new file mode 100644 index 0000000..b15685e --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/StreamApplication.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.javaapi; + +import akka.actor.ActorSystem; +import org.apache.gearpump.cluster.Application; +import org.apache.gearpump.cluster.ApplicationMaster; +import org.apache.gearpump.cluster.UserConfig; + +/** + * Java version of StreamApplication. + * + * Also see {@link org.apache.gearpump.streaming.StreamApplication} + */ +public class StreamApplication implements Application { + private org.apache.gearpump.streaming.StreamApplication app; + /** + * Creates a streaming application + * + * @param name Name of the application + * @param conf User configuration + * @param graph The DAG + */ + public StreamApplication(String name, UserConfig conf, Graph graph) { + //by pass the tricky type check in scala 2.10 + org.apache.gearpump.util.Graph untypedGraph = graph; + this.app = org.apache.gearpump.streaming.StreamApplication.apply( + name, untypedGraph, conf); + } + + @Override + public String name() { + return app.name(); + } + + @Override + public UserConfig userConfig(ActorSystem system) { + return app.userConfig(system); + } + + @Override + public Class<? extends ApplicationMaster> appMaster() { + return app.appMaster(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java new file mode 100644 index 0000000..2efce45 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.javaapi; + +import akka.actor.ActorRef; +import org.apache.gearpump.Message; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.streaming.task.StartTime; +import org.apache.gearpump.streaming.task.TaskContext; + +/** + * Java version of Task. + * + * See {@link org.apache.gearpump.streaming.task.Task} + */ +public class Task extends org.apache.gearpump.streaming.task.Task { + protected TaskContext context; + protected UserConfig userConf; + + public Task(TaskContext taskContext, UserConfig userConf) { + super(taskContext, userConf); + this.context = taskContext; + this.userConf = userConf; + } + + @Override + final public ActorRef self() { + return context.self(); + } + + @Override + public void onStart(StartTime startTime) { + } + + @Override + public void onNext(Message msg) { + } + + @Override + public void onStop() { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java new file mode 100644 index 0000000..f07ceff --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.javaapi.dsl.functions; + +import java.io.Serializable; + +/** + * Filter function + * + * @param <T> Message of type T + */ +public interface FilterFunction<T> extends Serializable { + boolean apply(T t); +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java new file mode 100644 index 0000000..9788dd2 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.javaapi.dsl.functions; + +import java.io.Serializable; +import java.util.Iterator; + +/** + * Function that converts a value of type T to a iterator of values of type R. + * + * @param <T> Input value type + * @param <R> Return value type + */ +public interface FlatMapFunction<T, R> extends Serializable { + Iterator<R> apply(T t); +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java new file mode 100644 index 0000000..6c71280 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.javaapi.dsl.functions; + +import java.io.Serializable; + +/** + * GroupBy function which assign value of type T to groups + * + * @param <T> Input value type + * @param <Group> Group Type + */ +public interface GroupByFunction<T, Group> extends Serializable { + Group apply(T t); +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java new file mode 100644 index 0000000..e1fc821 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.javaapi.dsl.functions; + +import java.io.Serializable; + +/** + * Function that map a value of type T to value of type R + * + * @param <T> Input value type + * @param <R> Output value type + */ +public interface MapFunction<T, R> extends Serializable { + R apply(T t); +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java new file mode 100644 index 0000000..2bcac60 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.javaapi.dsl.functions; + +import java.io.Serializable; + +/** + * Function that applies reduce operation + * + * @param <T> Input value type + */ +public interface ReduceFunction<T> extends Serializable { + T apply(T t1, T t2); +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/resources/geardefault.conf ---------------------------------------------------------------------- diff --git a/streaming/src/main/resources/geardefault.conf b/streaming/src/main/resources/geardefault.conf index 219543e..a484f3c 100644 --- a/streaming/src/main/resources/geardefault.conf +++ b/streaming/src/main/resources/geardefault.conf @@ -21,11 +21,11 @@ gearpump { ### Whitelist for Metrics Aggregator class. ### See class [[MetricsAggregator]] for more information. metrics-aggregator-class { - "io.gearpump.streaming.metrics.TaskFilterAggregator" = "" - "io.gearpump.streaming.metrics.ProcessorAggregator" = "" + "org.apache.gearpump.streaming.metrics.TaskFilterAggregator" = "" + "org.apache.gearpump.streaming.metrics.ProcessorAggregator" = "" } } } - transport.serializer = "io.gearpump.streaming.task.StreamingTransportSerializer" + transport.serializer = "org.apache.gearpump.streaming.task.StreamingTransportSerializer" } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala b/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala deleted file mode 100644 index 06cf022..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming - -import scala.language.existentials - -import akka.actor.ActorRef - -import io.gearpump.TimeStamp -import io.gearpump.cluster.appmaster.WorkerInfo -import io.gearpump.cluster.scheduler.Resource -import io.gearpump.streaming.appmaster.TaskRegistry.TaskLocations -import io.gearpump.streaming.task.{Subscriber, TaskId} -import io.gearpump.transport.HostPort - -object AppMasterToExecutor { - case class LaunchTasks( - taskId: List[TaskId], dagVersion: Int, processorDescription: ProcessorDescription, - subscribers: List[Subscriber]) - - case object TasksLaunched - - /** - * dagVersion, life, and subscribers will be changed on target task list. - */ - case class ChangeTasks( - taskId: List[TaskId], dagVersion: Int, life: LifeTime, subscribers: List[Subscriber]) - - case class TasksChanged(taskIds: List[TaskId]) - - case class ChangeTask( - taskId: TaskId, dagVersion: Int, life: LifeTime, subscribers: List[Subscriber]) - - case class TaskChanged(taskId: TaskId, dagVersion: Int) - - case class StartTask(taskId: TaskId) - - case class StopTask(taskId: TaskId) - - case class TaskLocationsReady(taskLocations: TaskLocations, dagVersion: Int) - - case class TaskLocationsReceived(dagVersion: Int, executorId: ExecutorId) - - case class TaskLocationsRejected( - dagVersion: Int, executorId: ExecutorId, reason: String, ex: Throwable) - - case class StartAllTasks(dagVersion: Int) - - case class StartDynamicDag(dagVersion: Int) - case class TaskRegistered(taskId: TaskId, sessionId: Int, startClock: TimeStamp) - case class TaskRejected(taskId: TaskId) - - case object RestartClockService - class MsgLostException extends Exception -} - -object ExecutorToAppMaster { - case class RegisterExecutor( - executor: ActorRef, executorId: Int, resource: Resource, worker : WorkerInfo) - - case class RegisterTask(taskId: TaskId, executorId: Int, task: HostPort) - case class UnRegisterTask(taskId: TaskId, executorId: Int) - - case class MessageLoss(executorId: Int, taskId: TaskId, cause: String) -} - -object AppMasterToMaster { - case class StallingTasks(tasks: List[TaskId]) -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/Constants.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/Constants.scala b/streaming/src/main/scala/io/gearpump/streaming/Constants.scala deleted file mode 100644 index becf31a..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/Constants.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming - -object Constants { - - val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator" - val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.dsl.source" - val GEARPUMP_STREAMING_SINK = "gearpump.streaming.dsl.sink" - val GEARPUMP_STREAMING_GROUPBY_FUNCTION = "gearpump.streaming.dsl.groupby-function" - - val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities" - - val GEARPUMP_STREAMING_REGISTER_TASK_TIMEOUT_MS = "gearpump.streaming.register-task-timeout-ms" - - val GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT = - "gearpump.streaming.max-pending-message-count-per-connection" - - val GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT = - "gearpump.streaming.ack-once-every-message-count" -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/DAG.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/DAG.scala b/streaming/src/main/scala/io/gearpump/streaming/DAG.scala deleted file mode 100644 index a73fd48..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/DAG.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming - -import io.gearpump.partitioner.PartitionerDescription -import io.gearpump.streaming.task.TaskId -import io.gearpump.util.Graph - -/** - * DAG is wrapper for [[io.gearpump.util.Graph]] for streaming applications. - */ -case class DAG(version: Int, processors : Map[ProcessorId, ProcessorDescription], - graph : Graph[ProcessorId, PartitionerDescription]) - extends Serializable { - - def isEmpty: Boolean = { - processors.isEmpty - } - - def taskCount: Int = { - processors.foldLeft(0) { (count, task) => - count + task._2.parallelism - } - } - - def tasks: List[TaskId] = { - processors.flatMap { pair => - val (processorId, processor) = pair - (0 until processor.parallelism).map(TaskId(processorId, _)) - }.toList - } -} - -object DAG { - def apply(graph: Graph[ProcessorDescription, PartitionerDescription], version: Int = 0): DAG = { - val processors = graph.vertices.map { processorDescription => - (processorDescription.id, processorDescription) - }.toMap - val dag = graph.mapVertex { processor => - processor.id - } - new DAG(version, processors, dag) - } - - def empty: DAG = apply(Graph.empty[ProcessorDescription, PartitionerDescription]) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala b/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala deleted file mode 100644 index 8eb866d..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming - -import java.io.{DataInput, DataOutput} - -import io.gearpump.streaming.task._ - -class TaskIdSerializer extends TaskMessageSerializer[TaskId] { - override def getLength(obj: TaskId): Int = 8 - - override def write(dataOutput: DataOutput, obj: TaskId): Unit = { - dataOutput.writeInt(obj.processorId) - dataOutput.writeInt(obj.index) - } - - override def read(dataInput: DataInput): TaskId = { - val processorId = dataInput.readInt() - val index = dataInput.readInt() - new TaskId(processorId, index) - } -} - -class AckSerializer extends TaskMessageSerializer[Ack] { - val taskIdSerializer = new TaskIdSerializer - - override def getLength(obj: Ack): Int = taskIdSerializer.getLength(obj.taskId) + 8 - - override def write(dataOutput: DataOutput, obj: Ack): Unit = { - taskIdSerializer.write(dataOutput, obj.taskId) - dataOutput.writeShort(obj.seq) - dataOutput.writeShort(obj.actualReceivedNum) - dataOutput.writeInt(obj.sessionId) - } - - override def read(dataInput: DataInput): Ack = { - val taskId = taskIdSerializer.read(dataInput) - val seq = dataInput.readShort() - val actualReceivedNum = dataInput.readShort() - val sessionId = dataInput.readInt() - Ack(taskId, seq, actualReceivedNum, sessionId) - } -} - -class InitialAckRequestSerializer extends TaskMessageSerializer[InitialAckRequest] { - val taskIdSerialzer = new TaskIdSerializer() - - override def getLength(obj: InitialAckRequest): Int = taskIdSerialzer.getLength(obj.taskId) + 4 - - override def write(dataOutput: DataOutput, obj: InitialAckRequest): Unit = { - taskIdSerialzer.write(dataOutput, obj.taskId) - dataOutput.writeInt(obj.sessionId) - } - - override def read(dataInput: DataInput): InitialAckRequest = { - val taskId = taskIdSerialzer.read(dataInput) - val sessionId = dataInput.readInt() - InitialAckRequest(taskId, sessionId) - } -} - -class AckRequestSerializer extends TaskMessageSerializer[AckRequest] { - val taskIdSerializer = new TaskIdSerializer - - override def getLength(obj: AckRequest): Int = taskIdSerializer.getLength(obj.taskId) + 6 - - override def write(dataOutput: DataOutput, obj: AckRequest): Unit = { - taskIdSerializer.write(dataOutput, obj.taskId) - dataOutput.writeShort(obj.seq) - dataOutput.writeInt(obj.sessionId) - } - - override def read(dataInput: DataInput): AckRequest = { - val taskId = taskIdSerializer.read(dataInput) - val seq = dataInput.readShort() - val sessionId = dataInput.readInt() - AckRequest(taskId, seq, sessionId) - } -} - -class LatencyProbeSerializer extends TaskMessageSerializer[LatencyProbe] { - override def getLength(obj: LatencyProbe): Int = 8 - - override def write(dataOutput: DataOutput, obj: LatencyProbe): Unit = { - dataOutput.writeLong(obj.timestamp) - } - - override def read(dataInput: DataInput): LatencyProbe = { - val timestamp = dataInput.readLong() - LatencyProbe(timestamp) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala deleted file mode 100644 index 0ca4d92..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming - -import scala.language.implicitConversions -import scala.reflect.ClassTag - -import akka.actor.ActorSystem - -import io.gearpump.TimeStamp -import io.gearpump.cluster._ -import io.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject} -import io.gearpump.streaming.appmaster.AppMaster -import io.gearpump.streaming.task.Task -import io.gearpump.util.{Graph, LogUtil, ReferenceEqual} - -/** - * Processor is the blueprint for tasks. - */ -trait Processor[+T <: Task] extends ReferenceEqual { - - /** - * How many tasks you want to use for this processor. - */ - def parallelism: Int - - /** - * The custom [[io.gearpump.cluster.UserConfig]], it is used to initialize a task in runtime. - */ - def taskConf: UserConfig - - /** - * Some description text for this processor. - */ - def description: String - - /** - * The task class, should be a subtype of Task. - * - * Each runtime instance of this class is a task. - */ - def taskClass: Class[_ <: Task] -} - -object Processor { - def ProcessorToProcessorDescription(id: ProcessorId, processor: Processor[_ <: Task]) - : ProcessorDescription = { - import processor._ - ProcessorDescription(id, taskClass.getName, parallelism, description, taskConf) - } - - def apply[T<: Task]( - parallelism : Int, description: String = "", - taskConf: UserConfig = UserConfig.empty)(implicit classtag: ClassTag[T]) - : DefaultProcessor[T] = { - new DefaultProcessor[T](parallelism, description, taskConf, - classtag.runtimeClass.asInstanceOf[Class[T]]) - } - - def apply[T<: Task]( - taskClazz: Class[T], parallelism : Int, description: String, taskConf: UserConfig) - : DefaultProcessor[T] = { - new DefaultProcessor[T](parallelism, description, taskConf, taskClazz) - } - - case class DefaultProcessor[T<: Task]( - parallelism : Int, description: String, taskConf: UserConfig, taskClass: Class[T]) - extends Processor[T] { - - def withParallelism(parallel: Int): DefaultProcessor[T] = { - new DefaultProcessor[T](parallel, description, taskConf, taskClass) - } - - def withDescription(desc: String): DefaultProcessor[T] = { - new DefaultProcessor[T](parallelism, desc, taskConf, taskClass) - } - - def withConfig(conf: UserConfig): DefaultProcessor[T] = { - new DefaultProcessor[T](parallelism, description, conf, taskClass) - } - } -} - -/** - * Each processor has a LifeTime. - * - * When input message's timestamp is beyond current processor's lifetime, - * then it will not be processed by this processor. - */ -case class LifeTime(birth: TimeStamp, death: TimeStamp) { - def contains(timestamp: TimeStamp): Boolean = { - timestamp >= birth && timestamp < death - } - - def cross(another: LifeTime): LifeTime = { - LifeTime(Math.max(birth, another.birth), Math.min(death, another.death)) - } -} - -object LifeTime { - val Immortal = LifeTime(0L, Long.MaxValue) -} - -/** - * Represent a streaming application - */ -class StreamApplication( - override val name: String, val inputUserConfig: UserConfig, - val dag: Graph[ProcessorDescription, PartitionerDescription]) - extends Application { - - require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges") - - override def appMaster: Class[_ <: ApplicationMaster] = classOf[AppMaster] - override def userConfig(implicit system: ActorSystem): UserConfig = { - inputUserConfig.withValue(StreamApplication.DAG, dag) - } -} - -case class ProcessorDescription( - id: ProcessorId, - taskClass: String, - parallelism : Int, - description: String = "", - taskConf: UserConfig = null, - life: LifeTime = LifeTime.Immortal, - jar: AppJar = null) extends ReferenceEqual - -object StreamApplication { - - private val hashPartitioner = new HashPartitioner() - private val LOG = LogUtil.getLogger(getClass) - - def apply[T <: Processor[Task], P <: Partitioner]( - name: String, dag: Graph[T, P], userConfig: UserConfig): StreamApplication = { - import io.gearpump.streaming.Processor._ - - if (dag.hasCycle()) { - LOG.warn(s"Detected cycles in DAG of application $name!") - } - - val indices = dag.topologicalOrderWithCirclesIterator.toList.zipWithIndex.toMap - val graph = dag.mapVertex { processor => - val updatedProcessor = ProcessorToProcessorDescription(indices(processor), processor) - updatedProcessor - }.mapEdge { (node1, edge, node2) => - PartitionerDescription(new PartitionerObject( - Option(edge).getOrElse(StreamApplication.hashPartitioner))) - } - new StreamApplication(name, userConfig, graph) - } - - val DAG = "DAG" -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala deleted file mode 100644 index df85017..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.appmaster - -import java.lang.management.ManagementFactory -import scala.concurrent.Future - -import akka.actor._ -import org.slf4j.Logger - -import io.gearpump._ -import io.gearpump.cluster.ClientToMaster.{GetLastFailure, GetStallingTasks, QueryHistoryMetrics, ShutdownApplication} -import io.gearpump.cluster.MasterToAppMaster.{AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge} -import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, LastFailure} -import io.gearpump.cluster._ -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.metrics.Metrics.ReportMetrics -import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} -import io.gearpump.partitioner.PartitionerDescription -import io.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterExecutor, RegisterTask, UnRegisterTask} -import io.gearpump.streaming._ -import io.gearpump.streaming.appmaster.AppMaster._ -import io.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, LatestDAG, ReplaceProcessor} -import io.gearpump.streaming.appmaster.ExecutorManager.{ExecutorInfo, GetExecutorInfo} -import io.gearpump.streaming.appmaster.TaskManager.{FailedToRecover, GetTaskList, TaskList} -import io.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig} -import io.gearpump.streaming.storage.InMemoryAppStoreOnMaster -import io.gearpump.streaming.task._ -import io.gearpump.streaming.util.ActorPathUtil -import io.gearpump.util.Constants.{APPMASTER_DEFAULT_EXECUTOR_ID, _} -import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig -import io.gearpump.util._ - -/** - * AppMaster is the head of a streaming application. - * - * It contains: - * 1. ExecutorManager to manage all executors. - * 2. TaskManager to manage all tasks, - * 3. ClockService to track the global clock for this streaming application. - * 4. Scheduler to decide which a task should be scheduled to. - */ -class AppMaster(appContext: AppMasterContext, app: AppDescription) extends ApplicationMaster { - import app.userConfig - import appContext.{appId, masterProxy, username} - - private implicit val actorSystem = context.system - private implicit val timeOut = FUTURE_TIMEOUT - - import akka.pattern.ask - private implicit val dispatcher = context.dispatcher - - private val startTime: TimeStamp = System.currentTimeMillis() - - private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) - LOG.info(s"AppMaster[$appId] is launched by $username, app: $app xxxxxxxxxxxxxxxxx") - LOG.info(s"AppMaster actor path: ${ActorUtil.getFullPath(context.system, self.path)}") - - private val address = ActorUtil.getFullPath(context.system, self.path) - - private val store = new InMemoryAppStoreOnMaster(appId, appContext.masterProxy) - private val dagManager = context.actorOf(Props(new DagManager(appContext.appId, userConfig, store, - Some(getUpdatedDAG())))) - - private var taskManager: Option[ActorRef] = None - private var clockService: Option[ActorRef] = None - private val systemConfig = context.system.settings.config - private var lastFailure = LastFailure(0L, null) - - private val appMasterBrief = ExecutorBrief(APPMASTER_DEFAULT_EXECUTOR_ID, - self.path.toString, - Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified), "active") - - private val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig) - - private val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED) - - private val userDir = System.getProperty("user.dir") - private val logFile = LogUtil.applicationLogDir(actorSystem.settings.config) - - private val appMasterExecutorSummary = ExecutorSummary( - APPMASTER_DEFAULT_EXECUTOR_ID, - Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified), - self.path.toString, - logFile.getAbsolutePath, - status = "Active", - taskCount = 0, - tasks = Map.empty[ProcessorId, List[TaskId]], - jvmName = ManagementFactory.getRuntimeMXBean().getName() - ) - - private val historyMetricsService = if (metricsEnabled) { - // Registers jvm metrics - Metrics(context.system).register(new JvmMetricsSet( - s"app${appId}.executor${APPMASTER_DEFAULT_EXECUTOR_ID}")) - - val historyMetricsService = context.actorOf(Props(new HistoryMetricsService( - s"app$appId", getHistoryMetricsConfig))) - - val metricsReportService = context.actorOf(Props( - new MetricsReporterService(Metrics(context.system)))) - historyMetricsService.tell(ReportMetrics, metricsReportService) - - Some(historyMetricsService) - } else { - None - } - - private val executorManager: ActorRef = - context.actorOf(ExecutorManager.props(userConfig, appContext, app.clusterConfig, app.name), - ActorPathUtil.executorManagerActorName) - - for (dag <- getDAG) { - clockService = Some(context.actorOf(Props(new ClockService(dag, store)))) - val jarScheduler = new JarScheduler(appId, app.name, systemConfig, context) - - taskManager = Some(context.actorOf(Props(new TaskManager(appContext.appId, dagManager, - jarScheduler, executorManager, clockService.get, self, app.name)))) - } - - override def receive: Receive = { - taskMessageHandler orElse - executorMessageHandler orElse - recover orElse - appMasterService orElse - ActorUtil.defaultMsgHandler(self) - } - - /** Handles messages from Tasks */ - def taskMessageHandler: Receive = { - case clock: ClockEvent => - taskManager.foreach(_ forward clock) - case register: RegisterTask => - taskManager.foreach(_ forward register) - case unRegister: UnRegisterTask => - taskManager.foreach(_ forward unRegister) - // Checks whether this processor dead, if it is, then we should remove it from clockService. - clockService.foreach(_ forward CheckProcessorDeath(unRegister.taskId.processorId)) - case replay: ReplayFromTimestampWindowTrailingEdge => - taskManager.foreach(_ forward replay) - case messageLoss: MessageLoss => - lastFailure = LastFailure(System.currentTimeMillis(), messageLoss.cause) - taskManager.foreach(_ forward messageLoss) - case lookupTask: LookupTaskActorRef => - taskManager.foreach(_ forward lookupTask) - case checkpoint: ReportCheckpointClock => - clockService.foreach(_ forward checkpoint) - case GetDAG => - val task = sender - getDAG.foreach { - dag => task ! dag - } - case GetCheckpointClock => - clockService.foreach(_ forward GetCheckpointClock) - } - - /** Handles messages from Executors */ - def executorMessageHandler: Receive = { - case register: RegisterExecutor => - executorManager forward register - case ReportMetrics => - historyMetricsService.foreach(_ forward ReportMetrics) - } - - /** Handles messages from AppMaster */ - def appMasterService: Receive = { - case appMasterDataDetailRequest: AppMasterDataDetailRequest => - LOG.debug(s"AppMaster got AppMasterDataDetailRequest for $appId ") - - val executorsFuture = executorBrief - val clockFuture = getMinClock - val taskFuture = getTaskList - val dagFuture = getDAG - - val appMasterDataDetail = for { - executors <- executorsFuture - clock <- clockFuture - tasks <- taskFuture - dag <- dagFuture - } yield { - val graph = dag.graph - - val executorToTasks = tasks.tasks.groupBy(_._2).mapValues { - _.keys.toList - } - - val processors = dag.processors.map { kv => - val processor = kv._2 - import processor._ - val tasks = executorToTasks.map { kv => - (kv._1, TaskCount(kv._2.count(_.processorId == id))) - }.filter(_._2.count != 0) - (id, - ProcessorSummary(id, taskClass, parallelism, description, taskConf, life, - tasks.keys.toList, tasks)) - } - - StreamAppMasterSummary( - appId = appId, - appName = app.name, - actorPath = address, - clock = clock, - status = MasterToAppMaster.AppMasterActive, - startTime = startTime, - uptime = System.currentTimeMillis() - startTime, - user = username, - homeDirectory = userDir, - logFile = logFile.getAbsolutePath, - processors = processors, - processorLevels = graph.vertexHierarchyLevelMap(), - dag = graph.mapEdge { (node1, edge, node2) => - edge.partitionerFactory.name - }, - executors = executors, - historyMetricsConfig = getHistoryMetricsConfig - ) - } - - val client = sender() - - appMasterDataDetail.map { appData => - client ! appData - } - // TODO: WebSocket is buggy and disabled. - // case appMasterMetricsRequest: AppMasterMetricsRequest => - // val client = sender() - // actorSystem.eventStream.subscribe(client, classOf[MetricType]) - case query: QueryHistoryMetrics => - if (historyMetricsService.isEmpty) { - // Returns empty metrics so that we don't hang the UI - sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem]) - } else { - historyMetricsService.get forward query - } - case getStalling: GetStallingTasks => - clockService.foreach(_ forward getStalling) - case replaceDAG: ReplaceProcessor => - dagManager forward replaceDAG - case GetLastFailure(_) => - sender ! lastFailure - case get@GetExecutorSummary(executorId) => - val client = sender - if (executorId == APPMASTER_DEFAULT_EXECUTOR_ID) { - client ! appMasterExecutorSummary - } else { - ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo) - .map { map => - map.get(executorId).foreach { executor => - executor.executor.tell(get, client) - } - } - } - case query@QueryExecutorConfig(executorId) => - val client = sender - if (executorId == -1) { - val systemConfig = context.system.settings.config - sender ! ExecutorConfig(ClusterConfig.filterOutDefaultConfig(systemConfig)) - } else { - ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo) - .map { map => - map.get(executorId).foreach { executor => - executor.executor.tell(query, client) - } - } - } - } - - /** Error handling */ - def recover: Receive = { - case FailedToRecover(errorMsg) => - if (context.children.toList.contains(sender())) { - LOG.error(errorMsg) - masterProxy ! ShutdownApplication(appId) - } - case AllocateResourceTimeOut => - LOG.error(s"Failed to allocate resource in time, shutdown application $appId") - masterProxy ! ShutdownApplication(appId) - context.stop(self) - } - - private def getMinClock: Future[TimeStamp] = { - clockService match { - case Some(clockService) => - (clockService ? GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map(_.clock) - case None => - Future.failed(new ServiceNotAvailableException("clock service not ready")) - } - } - - private def executorBrief: Future[List[ExecutorBrief]] = { - ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo) - .map { infos => - infos.values.map { info => - ExecutorBrief(info.executorId, - info.executor.path.toSerializationFormat, - info.worker.workerId, - "active") - }.toList :+ appMasterBrief - } - } - - private def getTaskList: Future[TaskList] = { - taskManager match { - case Some(taskManager) => - (taskManager ? GetTaskList).asInstanceOf[Future[TaskList]] - case None => - Future.failed(new ServiceNotAvailableException("task manager not ready")) - } - } - - private def getDAG: Future[DAG] = { - (dagManager ? GetLatestDAG).asInstanceOf[Future[LatestDAG]].map(_.dag) - } - - private def getUpdatedDAG(): DAG = { - val dag = DAG(userConfig.getValue[Graph[ProcessorDescription, - PartitionerDescription]](StreamApplication.DAG).get) - val updated = dag.processors.map { idAndProcessor => - val (id, oldProcessor) = idAndProcessor - val newProcessor = if (oldProcessor.jar == null) { - oldProcessor.copy(jar = appContext.appJar.getOrElse(null)) - } else { - oldProcessor - } - (id, newProcessor) - } - DAG(dag.version, updated, dag.graph) - } -} - -object AppMaster { - - /** Master node doesn't return resource in time */ - case object AllocateResourceTimeOut - - /** Query task ActorRef by providing the taskId */ - case class LookupTaskActorRef(taskId: TaskId) - - case class TaskActorRef(task: ActorRef) - - class ServiceNotAvailableException(reason: String) extends Exception(reason) - - case class ExecutorBrief( - executorId: ExecutorId, executor: String, workerId: WorkerId, status: String) - -} \ No newline at end of file
