http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/package.scala b/core/src/main/scala/io/gearpump/package.scala deleted file mode 100644 index 1877651..0000000 --- a/core/src/main/scala/io/gearpump/package.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io - -package object gearpump { - type TimeStamp = Long - val LatestTime = -1 -}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala deleted file mode 100644 index 0b9c57e..0000000 --- a/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.partitioner - -import io.gearpump.Message - -/** Used by storm module to broadcast message to all downstream tasks */ -class BroadcastPartitioner extends MulticastPartitioner { - private var lastPartitionNum = -1 - private var partitions = Array.empty[Int] - - override def getPartitions( - msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = { - if (partitionNum != lastPartitionNum) { - partitions = (0 until partitionNum).toArray - lastPartitionNum = partitionNum - } - partitions - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala deleted file mode 100644 index 062fc10..0000000 --- a/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.partitioner - -import io.gearpump.Message - -/** - * Will have the same parallelism with last processor - * And each task in current processor will co-locate with task of last processor - */ -class CoLocationPartitioner extends UnicastPartitioner { - override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { - currentPartitionId - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala deleted file mode 100644 index 6ba0cd6..0000000 --- a/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.partitioner - -import io.gearpump.Message - -/** - * Only make sense when the message has implemented the hashCode() - * Otherwise, it will use Object.hashCode(), which will not return - * same hash code after serialization and deserialization. - */ -class HashPartitioner extends UnicastPartitioner { - override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { - (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala b/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala deleted file mode 100644 index 69104c7..0000000 --- a/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.partitioner - -import scala.reflect.ClassTag - -import org.apache.commons.lang.SerializationUtils - -import io.gearpump.Message - -/** - * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner decide how ONE task - * of upstream processor A send to several tasks of downstream processor B. - */ -sealed trait Partitioner extends Serializable - -/** - * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), UnicastPartitioner does - * ONE-task {@literal ->} ONE-task mapping. - */ -trait UnicastPartitioner extends Partitioner { - - /** - * Gets the SINGLE downstream processor task index to send message to. - * - * @param msg Message you want to send - * @param partitionNum How many tasks does the downstream processor have. - * @param upstreamTaskIndex Upstream task's task index who trigger the getPartition() call. - * - * @return ONE task index of downstream processor. - */ - def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Int - - def getPartition(msg: Message, partitionNum: Int): Int = { - getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID) - } -} - -trait MulticastPartitioner extends Partitioner { - - /** - * Gets a list of downstream processor task indexes to send message to. - * - * @param upstreamTaskIndex Current sender task's task index. - * - */ - def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Array[Int] - - def getPartitions(msg: Message, partitionNum: Int): Array[Int] = { - getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID) - } -} - -sealed trait PartitionerFactory { - - def name: String - - def partitioner: Partitioner -} - -/** Stores the Partitioner in an object. To use it, user need to deserialize the object */ -class PartitionerObject(private[this] val _partitioner: Partitioner) - extends PartitionerFactory with Serializable { - - override def name: String = partitioner.getClass.getName - - override def partitioner: Partitioner = { - SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner] - } -} - -/** Store the partitioner in class Name, the user need to instantiate a new class */ -class PartitionerByClassName(partitionerClass: String) - extends PartitionerFactory with Serializable { - - override def name: String = partitionerClass - override def partitioner: Partitioner = { - Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner] - } -} - -/** - * @param partitionerFactory How we construct a Partitioner. - */ -case class PartitionerDescription(partitionerFactory: PartitionerFactory) - -object Partitioner { - val UNKNOWN_PARTITION_ID = -1 - - def apply[T <: Partitioner](implicit clazz: ClassTag[T]): PartitionerDescription = { - PartitionerDescription(new PartitionerByClassName(clazz.runtimeClass.getName)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala deleted file mode 100644 index ff962fa..0000000 --- a/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.partitioner - -import scala.util.Random - -import io.gearpump.Message - -/** - * The idea of ShuffleGroupingPartitioner is derived from Storm. - * Messages are randomly distributed across the downstream's tasks in a way such that - * each task is guaranteed to get an equal number of messages. - */ -class ShuffleGroupingPartitioner extends UnicastPartitioner { - private val random = new Random - private var index = -1 - private var partitions = List.empty[Int] - override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { - index += 1 - if (partitions.isEmpty) { - partitions = 0.until(partitionNum).toList - partitions = random.shuffle(partitions) - } else if (index >= partitionNum) { - index = 0 - partitions = random.shuffle(partitions) - } - partitions(index) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala deleted file mode 100644 index 6b3c26e..0000000 --- a/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.partitioner - -import java.util.Random - -import io.gearpump.Message - -/** - * Round Robin partition the data to downstream processor tasks. - */ -class ShufflePartitioner extends UnicastPartitioner { - private var seed = 0 - private var count = 0 - - override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { - - if (seed == 0) { - seed = newSeed() - } - - val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum - count = count + 1 - result - } - - private def newSeed(): Int = new Random().nextInt() -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/security/Authenticator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/security/Authenticator.scala b/core/src/main/scala/io/gearpump/security/Authenticator.scala deleted file mode 100644 index 73bc8e1..0000000 --- a/core/src/main/scala/io/gearpump/security/Authenticator.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.security -import scala.concurrent.{ExecutionContext, Future} - -import io.gearpump.security.Authenticator.AuthenticationResult - -/** - * Authenticator for UI dashboard. - * - * Sub Class must implement a constructor with signature like this: - * this(config: Config) - */ -trait Authenticator { - - // TODO: Change the signature to return more attributes of user credentials... - def authenticate( - user: String, password: String, ec: ExecutionContext): Future[AuthenticationResult] -} - -object Authenticator { - - trait AuthenticationResult { - - def authenticated: Boolean - - def permissionLevel: Int - } - - val UnAuthenticated = new AuthenticationResult { - override val authenticated = false - override val permissionLevel = -1 - } - - /** Guest can view but have no permission to submit app or write */ - val Guest = new AuthenticationResult { - override val authenticated = true - override val permissionLevel = 1000 - } - - /** User can submit app, kill app, but have no permission to add or remote machines */ - val User = new AuthenticationResult { - override val authenticated = true - override val permissionLevel = 1000 + Guest.permissionLevel - } - - /** Super user */ - val Admin = new AuthenticationResult { - override val authenticated = true - override val permissionLevel = 1000 + User.permissionLevel - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala b/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala deleted file mode 100644 index 0743a3f..0000000 --- a/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.security - -import scala.concurrent.{ExecutionContext, Future} - -import com.typesafe.config.Config - -import io.gearpump.security.Authenticator.AuthenticationResult -import io.gearpump.security.ConfigFileBasedAuthenticator._ - -object ConfigFileBasedAuthenticator { - - private val ROOT = "gearpump.ui-security.config-file-based-authenticator" - private val ADMINS = ROOT + "." + "admins" - private val USERS = ROOT + "." + "users" - private val GUESTS = ROOT + "." + "guests" - - private case class Credentials( - admins: Map[String, String], users: Map[String, String], guests: Map[String, String]) { - - def verify(user: String, password: String): AuthenticationResult = { - if (admins.contains(user)) { - if (verify(user, password, admins)) { - Authenticator.Admin - } else { - Authenticator.UnAuthenticated - } - } else if (users.contains(user)) { - if (verify(user, password, users)) { - Authenticator.User - } else { - Authenticator.UnAuthenticated - } - } else if (guests.contains(user)) { - if (verify(user, password, guests)) { - Authenticator.Guest - } else { - Authenticator.UnAuthenticated - } - } else { - Authenticator.UnAuthenticated - } - } - - private def verify(user: String, password: String, map: Map[String, String]): Boolean = { - val storedPass = map(user) - PasswordUtil.verify(password, storedPass) - } - } -} - -/** - * UI dashboard authenticator based on configuration file. - * - * It has three categories of users: admins, users, and guests. - * admins have unlimited permission, like shutdown a cluster, add/remove machines. - * users have limited permission to submit an application and etc.. - * guests can not submit/kill applications, but can view the application status. - * - * see conf/gear.conf section gearpump.ui-security.config-file-based-authenticator to find - * information about how to configure this authenticator. - * - * [Security consideration] - * It will keep one-way sha1 digest of password instead of password itself. The original password is - * NOT kept in any way, so generally it is safe. - * - * - * digesting flow (from original password to digest): - * {{{ - * random salt byte array of length 8 -> byte array of (salt + sha1(salt, password)) -> - * base64Encode. - * }}} - * - * Verification user input password with stored digest: - * {{{ - * base64Decode -> extract salt -> do sha1(salt, password) -> generate digest: - * salt + sha1 -> compare the generated digest with the stored digest. - * }}} - */ -class ConfigFileBasedAuthenticator(config: Config) extends Authenticator { - - private val credentials = loadCredentials(config) - - override def authenticate(user: String, password: String, ec: ExecutionContext) - : Future[AuthenticationResult] = { - implicit val ctx = ec - Future { - credentials.verify(user, password) - } - } - - private def loadCredentials(config: Config): Credentials = { - val admins = configToMap(config, ADMINS) - val users = configToMap(config, USERS) - val guests = configToMap(config, GUESTS) - new Credentials(admins, users, guests) - } - - private def configToMap(config: Config, path: String) = { - import scala.collection.JavaConverters._ - config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/security/PasswordUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/security/PasswordUtil.scala b/core/src/main/scala/io/gearpump/security/PasswordUtil.scala deleted file mode 100644 index 9bf40d2..0000000 --- a/core/src/main/scala/io/gearpump/security/PasswordUtil.scala +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.security - -import java.security.MessageDigest -import scala.util.Try - -import sun.misc.{BASE64Decoder, BASE64Encoder} - -/** - * Util to verify whether user input password is valid or not. - * It use sha1 to do the digesting. - */ -object PasswordUtil { - private val SALT_LENGTH = 8 - - /** - * Verifies user input password with stored digest: - * {{{ - * base64Decode -> extract salt -> do sha1(salt, password) -> - * generate digest: salt + sha1 -> compare the generated digest with the stored digest. - * }}} - */ - def verify(password: String, stored: String): Boolean = { - Try { - val decoded = base64Decode(stored) - val salt = new Array[Byte](SALT_LENGTH) - Array.copy(decoded, 0, salt, 0, SALT_LENGTH) - - hash(password, salt) == stored - }.getOrElse(false) - } - /** - * digesting flow (from original password to digest): - * {{{ - * random salt byte array of length 8 -> - * byte array of (salt + sha1(salt, password)) -> base64Encode - * }}} - */ - def hash(password: String): String = { - // Salt generation 64 bits long - val salt = new Array[Byte](SALT_LENGTH) - new java.util.Random().nextBytes(salt) - hash(password, salt) - } - - private def hash(password: String, salt: Array[Byte]): String = { - val digest = MessageDigest.getInstance("SHA-1") - digest.reset() - digest.update(salt) - var input = digest.digest(password.getBytes("UTF-8")) - digest.reset() - input = digest.digest(input) - val withSalt = salt ++ input - base64Encode(withSalt) - } - - private def base64Encode(data: Array[Byte]): String = { - val endecoder = new BASE64Encoder() - endecoder.encode(data) - } - - private def base64Decode(data: String): Array[Byte] = { - val decoder = new BASE64Decoder() - decoder.decodeBuffer(data) - } - - // scalastyle:off println - private def help() = { - Console.println("usage: gear io.gearpump.security.PasswordUtil -password <your password>") - } - - def main(args: Array[String]): Unit = { - if (args.length != 2 || args(0) != "-password") { - help() - } else { - val pass = args(1) - val result = hash(pass) - Console.println("Here is the hashed password") - Console.println("==============================") - Console.println(result) - } - } - // scalastyle:on println -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala b/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala deleted file mode 100644 index cb9d563..0000000 --- a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.serializer - -import akka.actor.ExtendedActorSystem - -import io.gearpump.cluster.UserConfig - -/** - * A build-in serializer framework using kryo - * - * NOTE: The Kryo here is a shaded version by Gearpump - */ -class FastKryoSerializationFramework extends SerializationFramework { - private var system: ExtendedActorSystem = null - - private lazy val pool = new ThreadLocal[Serializer]() { - override def initialValue(): Serializer = { - new FastKryoSerializer(system) - } - } - - override def init(system: ExtendedActorSystem, config: UserConfig): Unit = { - this.system = system - } - - override def get(): Serializer = { - pool.get() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala b/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala deleted file mode 100644 index 57b7b5e..0000000 --- a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.serializer - -import akka.actor.ExtendedActorSystem - -import io.gearpump.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy -import io.gearpump.objenesis.strategy.StdInstantiatorStrategy -import io.gearpump.romix.serialization.kryo.KryoSerializerWrapper -import io.gearpump.serializer.FastKryoSerializer.KryoSerializationException -import io.gearpump.util.LogUtil - -class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer { - - private val LOG = LogUtil.getLogger(getClass) - private val config = system.settings.config - - private val kryoSerializer = new KryoSerializerWrapper(system) - private val kryo = kryoSerializer.kryo - val strategy = new DefaultInstantiatorStrategy - strategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy) - kryo.setInstantiatorStrategy(strategy) - private val kryoClazz = new GearpumpSerialization(config).customize(kryo) - - override def serialize(message: Any): Array[Byte] = { - try { - kryoSerializer.toBinary(message) - } catch { - case ex: java.lang.IllegalArgumentException => - val clazz = message.getClass - val error = s""" - | ${ex.getMessage} - |You can also register the class by providing a configuration with serializer - |defined, - | - |gearpump{ - | serializers { - | ## Follow this format when adding new serializer for new message types - | # "yourpackage.YourClass" = "yourpackage.YourSerializerForThisClass" - | - | ## If you intend to use default serializer for this class, then you can write this - | # "yourpackage.YourClass" = "" - | } - |} - | - |If you want to register the serializer globally, you need to change - |gear.conf on every worker in the cluster; if you only want to register - |the serializer for a single streaming application, you need to create - |a file under conf/ named application.conf, and add the above configuration - |into application.conf. To verify whether the configuration is effective, - |you can browser your UI http://{UI Server Host}:8090/api/v1.0/app/{appId}/config, - |and check whether your custom serializer is added. - """.stripMargin - - LOG.error(error, ex) - throw new KryoSerializationException(error, ex) - } - } - - override def deserialize(msg: Array[Byte]): Any = { - kryoSerializer.fromBinary(msg) - } -} - -object FastKryoSerializer { - class KryoSerializationException(msg: String, ex: Throwable = null) extends Exception(msg, ex) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala b/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala deleted file mode 100644 index a7eb6cf..0000000 --- a/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.serializer - -import com.typesafe.config.Config -import org.slf4j.Logger - -import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer} -import io.gearpump.util.{Constants, LogUtil} - -class GearpumpSerialization(config: Config) { - - private val LOG: Logger = LogUtil.getLogger(getClass) - - def customize(kryo: Kryo): Unit = { - - val serializationMap = configToMap(config, Constants.GEARPUMP_SERIALIZERS) - - serializationMap.foreach { kv => - val (key, value) = kv - val keyClass = Class.forName(key) - - if (value == null || value.isEmpty) { - - // Use default serializer for this class type - kryo.register(keyClass) - } else { - val valueClass = Class.forName(value) - val register = kryo.register(keyClass, - valueClass.newInstance().asInstanceOf[KryoSerializer[_]]) - LOG.debug(s"Registering ${keyClass}, id: ${register.getId}") - } - } - kryo.setReferences(false) - - // Requires the user to register the class first before using - kryo.setRegistrationRequired(true) - } - - private final def configToMap(config: Config, path: String) = { - import scala.collection.JavaConverters._ - config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala b/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala deleted file mode 100644 index 4947dcc..0000000 --- a/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.serializer - -import akka.actor.ExtendedActorSystem - -import io.gearpump.cluster.UserConfig - -/** - * User are allowed to use a customized serialization framework by extending this - * interface. - */ -trait SerializationFramework { - def init(system: ExtendedActorSystem, config: UserConfig) - - /** - * - * Need to be thread safe - * - * Get a serializer to use. - * Note: this method can be called in a multi-thread environment. It's the - * responsibility of SerializationFramework Developer to assure this method - * is thread safe. - * - * To be thread-safe, one recommendation would be using a thread local pool - * to maintain reference to Serializer of same thread. - * - * @return - */ - def get(): Serializer -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/serializer/Serializer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/serializer/Serializer.scala b/core/src/main/scala/io/gearpump/serializer/Serializer.scala deleted file mode 100644 index ff8b147..0000000 --- a/core/src/main/scala/io/gearpump/serializer/Serializer.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.serializer - -/** - * User defined message serializer - */ -trait Serializer { - def serialize(message: Any): Array[Byte] - - def deserialize(msg: Array[Byte]): Any -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/Express.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/transport/Express.scala b/core/src/main/scala/io/gearpump/transport/Express.scala deleted file mode 100644 index 101b841..0000000 --- a/core/src/main/scala/io/gearpump/transport/Express.scala +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.transport - -import scala.collection.immutable.LongMap -import scala.concurrent._ - -import akka.actor._ -import akka.agent.Agent -import org.slf4j.Logger - -import io.gearpump.transport.netty.Client.Close -import io.gearpump.transport.netty.{Context, TaskMessage} -import io.gearpump.util.LogUtil - -trait ActorLookupById { - - /** Lookup actor ref for local task actor by providing a TaskId (TaskId.toLong) */ - def lookupLocalActor(id: Long): Option[ActorRef] -} - -/** - * Custom networking layer. - * - * It will translate long sender/receiver address to shorter ones to reduce - * the network overhead. - */ -class Express(val system: ExtendedActorSystem) extends Extension with ActorLookupById { - - import system.dispatcher - - import io.gearpump.transport.Express._ - val localActorMap = Agent(LongMap.empty[ActorRef]) - val remoteAddressMap = Agent(Map.empty[Long, HostPort]) - - val remoteClientMap = Agent(Map.empty[HostPort, ActorRef]) - - val conf = system.settings.config - - lazy val (context, serverPort, localHost) = init - - lazy val init = { - LOG.info(s"Start Express init ...${system.name}") - val context = new Context(system, conf) - val serverPort = context.bind("netty-server", this) - val localHost = HostPort(system.provider.getDefaultAddress.host.get, serverPort) - LOG.info(s"binding to netty server $localHost") - - system.registerOnTermination(new Runnable { - override def run(): Unit = context.close() - }) - (context, serverPort, localHost) - } - - def unregisterLocalActor(id: Long): Unit = { - localActorMap.sendOff(_ - id) - } - - /** Start Netty client actors to connect to remote machines */ - def startClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = { - val clientsToClose = remoteClientMap.get().filterKeys(!hostPorts.contains(_)).keySet - closeClients(clientsToClose) - hostPorts.toList.foldLeft(Future(Map.empty[HostPort, ActorRef])) { (future, hostPort) => - remoteClientMap.alter { map => - if (!map.contains(hostPort)) { - val actor = context.connect(hostPort) - map + (hostPort -> actor) - } else { - map - } - } - } - } - - def closeClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = { - remoteClientMap.alter { map => - map.filterKeys(hostPorts.contains).foreach { hostAndClient => - val (_, client) = hostAndClient - client ! Close - } - map -- hostPorts - } - } - - def registerLocalActor(id: Long, actor: ActorRef): Unit = { - LOG.info(s"RegisterLocalActor: $id, actor: ${actor.path.name}") - init - localActorMap.sendOff(_ + (id -> actor)) - } - - def lookupLocalActor(id: Long): Option[ActorRef] = localActorMap.get().get(id) - - def lookupRemoteAddress(id: Long): Option[HostPort] = remoteAddressMap.get().get(id) - - /** Send message to remote task */ - def transport(taskMessage: TaskMessage, remote: HostPort): Unit = { - - val remoteClient = remoteClientMap.get.get(remote) - if (remoteClient.isDefined) { - remoteClient.get.tell(taskMessage, Actor.noSender) - } else { - val errorMsg = s"Clients has not been launched properly before transporting messages, " + - s"the destination is $remote" - LOG.error(errorMsg) - throw new Exception(errorMsg) - } - } -} - -/** A customized transport layer by using Akka extension */ -object Express extends ExtensionId[Express] with ExtensionIdProvider { - val LOG: Logger = LogUtil.getLogger(getClass) - - override def get(system: ActorSystem): Express = super.get(system) - - override def lookup: ExtensionId[Express] = Express - - override def createExtension(system: ExtendedActorSystem): Express = new Express(system) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/HostPort.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/transport/HostPort.scala b/core/src/main/scala/io/gearpump/transport/HostPort.scala deleted file mode 100644 index 40c4342..0000000 --- a/core/src/main/scala/io/gearpump/transport/HostPort.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.transport - -case class HostPort(host: String, port: Int) { - def toTuple: (String, Int) = { - (host, port) - } -} - -object HostPort { - def apply(address: String): HostPort = { - val hostAndPort = address.split(":") - new HostPort(hostAndPort(0), hostAndPort(1).toInt) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/netty/Client.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/transport/netty/Client.scala b/core/src/main/scala/io/gearpump/transport/netty/Client.scala deleted file mode 100644 index d5960ad..0000000 --- a/core/src/main/scala/io/gearpump/transport/netty/Client.scala +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.transport.netty - -import java.net.{ConnectException, InetSocketAddress} -import java.nio.channels.ClosedChannelException -import java.util -import java.util.Random -import java.util.concurrent.TimeUnit -import scala.concurrent.duration.FiniteDuration -import scala.language.implicitConversions - -import akka.actor.Actor -import org.jboss.netty.bootstrap.ClientBootstrap -import org.jboss.netty.channel._ -import org.slf4j.Logger - -import io.gearpump.transport.HostPort -import io.gearpump.util.LogUtil - -/** - * Netty Client implemented as an actor, on the other side, there is a netty server Actor. - * All messages sent to this actor will be forwarded to remote machine. - */ -class Client(conf: NettyConfig, factory: ChannelFactory, hostPort: HostPort) extends Actor { - import io.gearpump.transport.netty.Client._ - - val name = s"netty-client-$hostPort" - - private final var bootstrap: ClientBootstrap = null - private final val random: Random = new Random - private val serializer = conf.newTransportSerializer - private var channel: Channel = null - - var batch = new util.ArrayList[TaskMessage] - - private val init = { - bootstrap = NettyUtil.createClientBootStrap(factory, - new ClientPipelineFactory(name, conf), conf.buffer_size) - self ! Connect(0) - } - - def receive: Receive = messageHandler orElse connectionHandler - - def messageHandler: Receive = { - case msg: TaskMessage => - batch.add(msg) - case flush@Flush(flushChannel) => - if (channel != flushChannel) { - Unit // Drop, as it belong to old channel flush message - } else if (batch.size > 0 && flushChannel.isWritable) { - send(flushChannel, batch.iterator) - batch.clear() - self ! flush - } else { - import context.dispatcher - context.system.scheduler.scheduleOnce( - new FiniteDuration(conf.flushCheckInterval, TimeUnit.MILLISECONDS), self, flush) - } - } - - def connectionHandler: Receive = { - case ChannelReady(channel) => - this.channel = channel - self ! Flush(channel) - case Connect(tries) => - if (null == channel) { - connect(tries) - } else { - LOG.error("there already exist a channel, will not establish a new one...") - } - case CompareAndReconnectIfEqual(oldChannel) => - if (oldChannel == channel) { - channel = null - self ! Connect(0) - } - case Close => - close() - context.become(closed) - } - - def closed: Receive = { - case msg: AnyRef => - LOG.error(s"This client $name is closed, drop any message ${msg.getClass.getSimpleName}...") - } - - private def connect(tries: Int): Unit = { - LOG.info(s"netty client try to connect to $name, tries: $tries") - if (tries <= conf.max_retries) { - val remote_addr = new InetSocketAddress(hostPort.host, hostPort.port) - val future = bootstrap.connect(remote_addr) - future success { current => - LOG.info(s"netty client successfully connectted to $name, tries: $tries") - self ! ChannelReady(current) - } fail { (current, ex) => - LOG.error(s"failed to connect to $name, reason: ${ex.getMessage}, class: ${ex.getClass}") - current.close() - import context.dispatcher - context.system.scheduler.scheduleOnce( - new FiniteDuration( - getSleepTimeMs(tries), TimeUnit.MILLISECONDS), self, Connect(tries + 1)) - } - } else { - LOG.error(s"fail to connect to a remote host $name after retied $tries ...") - self ! Close - } - } - - private def send(flushChannel: Channel, msgs: util.Iterator[TaskMessage]) { - var messageBatch: MessageBatch = null - - while (msgs.hasNext) { - val message: TaskMessage = msgs.next() - if (null == messageBatch) { - messageBatch = new MessageBatch(conf.messageBatchSize, serializer) - } - messageBatch.add(message) - if (messageBatch.isFull) { - val toBeFlushed: MessageBatch = messageBatch - flushRequest(flushChannel, toBeFlushed) - messageBatch = null - } - } - if (null != messageBatch && !messageBatch.isEmpty) { - flushRequest(flushChannel, messageBatch) - } - } - - private def close() { - LOG.info(s"closing netty client $name...") - if (null != channel) { - channel.close() - channel = null - } - batch = null - } - - override def postStop(): Unit = { - close() - } - - private def flushRequest(channel: Channel, requests: MessageBatch) { - val future: ChannelFuture = channel.write(requests) - future.fail { (channel, ex) => - if (channel.isOpen) { - channel.close - } - LOG.error(s"failed to send requests " + - s"to ${channel.getRemoteAddress} ${ex.getClass.getSimpleName}") - if (!ex.isInstanceOf[ClosedChannelException]) { - LOG.error(ex.getMessage, ex) - } - self ! CompareAndReconnectIfEqual(channel) - } - } - - private def getSleepTimeMs(retries: Int): Long = { - if (retries > 30) { - conf.max_sleep_ms - } else { - val backoff = 1 << retries - val sleepMs = conf.base_sleep_ms * Math.max(1, random.nextInt(backoff)) - if (sleepMs < conf.max_sleep_ms) sleepMs else conf.max_sleep_ms - } - } - - private def isChannelWritable = (null != channel) && channel.isWritable -} - -object Client { - val LOG: Logger = LogUtil.getLogger(getClass) - - // Reconnect if current channel equals channel - case class CompareAndReconnectIfEqual(channel: Channel) - - case class Connect(tries: Int) - case class ChannelReady(chanel: Channel) - case object Close - - case class Flush(channel: Channel) - - class ClientErrorHandler(name: String) extends SimpleChannelUpstreamHandler { - - override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) { - event.getCause match { - case ex: ConnectException => Unit - case ex: ClosedChannelException => - LOG.warn("exception found when trying to close netty connection", ex.getMessage) - case ex => LOG.error("Connection failed " + name, ex) - } - } - } - - class ClientPipelineFactory(name: String, conf: NettyConfig) extends ChannelPipelineFactory { - def getPipeline: ChannelPipeline = { - val pipeline: ChannelPipeline = Channels.pipeline - pipeline.addLast("decoder", new MessageDecoder(conf.newTransportSerializer)) - pipeline.addLast("encoder", new MessageEncoder) - pipeline.addLast("handler", new ClientErrorHandler(name)) - pipeline - } - } - - implicit def channelFutureToChannelFutureOps(channel: ChannelFuture): ChannelFutureOps = { - new ChannelFutureOps(channel) - } - - class ChannelFutureOps(channelFuture: ChannelFuture) { - - def success(handler: (Channel => Unit)): ChannelFuture = { - channelFuture.addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture) { - if (future.isSuccess) { - handler(future.getChannel) - } - } - }) - channelFuture - } - - def fail(handler: ((Channel, Throwable) => Unit)): ChannelFuture = { - channelFuture.addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture) { - if (!future.isSuccess) { - handler(future.getChannel, future.getCause) - } - } - }) - channelFuture - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/netty/Context.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/transport/netty/Context.scala b/core/src/main/scala/io/gearpump/transport/netty/Context.scala deleted file mode 100644 index 9a9ee29..0000000 --- a/core/src/main/scala/io/gearpump/transport/netty/Context.scala +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.transport.netty - -import java.io.Closeable -import java.util.concurrent._ -import scala.collection.JavaConverters._ - -import akka.actor.{ActorRef, ActorSystem, Props} -import com.typesafe.config.Config -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory -import org.slf4j.Logger - -import io.gearpump.transport.netty.Server.ServerPipelineFactory -import io.gearpump.transport.{ActorLookupById, HostPort} -import io.gearpump.util.{Constants, LogUtil} - -object Context { - private final val LOG: Logger = LogUtil.getLogger(getClass) -} - -/** Netty Context */ -class Context(system: ActorSystem, conf: NettyConfig) extends IContext { - import io.gearpump.transport.netty.Context._ - - def this(system: ActorSystem, conf: Config) { - this(system, new NettyConfig(conf)) - } - - private val closeHandler = new ConcurrentLinkedQueue[Closeable]() - private val nettyDispatcher = system.settings.config.getString(Constants.NETTY_DISPATCHER) - val maxWorkers: Int = 1 - - private lazy val clientChannelFactory: NioClientSocketChannelFactory = { - val bossFactory: ThreadFactory = new NettyRenameThreadFactory("client" + "-boss") - val workerFactory: ThreadFactory = new NettyRenameThreadFactory("client" + "-worker") - val channelFactory = - new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory), maxWorkers) - - closeHandler.add(new Closeable { - override def close(): Unit = { - LOG.info("Closing all client resources....") - channelFactory.releaseExternalResources - } - }) - channelFactory - } - - def bind( - name: String, lookupActor : ActorLookupById, deserializeFlag : Boolean = true, - inputPort: Int = 0): Int = { - // TODO: whether we should expose it as application config? - val server = system.actorOf(Props(classOf[Server], name, conf, lookupActor, - deserializeFlag).withDispatcher(nettyDispatcher), name) - val (port, channel) = NettyUtil.newNettyServer(name, - new ServerPipelineFactory(server, conf), 5242880, inputPort) - val factory = channel.getFactory - closeHandler.add(new Closeable { - override def close(): Unit = { - system.stop(server) - channel.close() - LOG.info("Closing all server resources....") - factory.releaseExternalResources - } - }) - port - } - - def connect(hostPort: HostPort): ActorRef = { - val client = system.actorOf(Props(classOf[Client], conf, clientChannelFactory, hostPort) - .withDispatcher(nettyDispatcher)) - closeHandler.add(new Closeable { - override def close(): Unit = { - LOG.info("closing Client actor....") - system.stop(client) - } - }) - - client - } - - /** - * terminate this context - */ - def close(): Unit = { - - LOG.info(s"Context.term, cleanup resources...., " + - s"we have ${closeHandler.size()} items to close...") - - // Cleans up resource in reverse order so that client actor can be cleaned - // before clientChannelFactory - closeHandler.iterator().asScala.toList.reverse.foreach(_.close()) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/netty/IContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/transport/netty/IContext.scala b/core/src/main/scala/io/gearpump/transport/netty/IContext.scala deleted file mode 100644 index 56b2f7c..0000000 --- a/core/src/main/scala/io/gearpump/transport/netty/IContext.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.transport.netty - -import akka.actor.ActorRef - -import io.gearpump.transport.{ActorLookupById, HostPort} - -trait IContext { - - /** - * Create a Netty server connection. - */ - def bind(name: String, lookupActor: ActorLookupById, deserializeFlag: Boolean, port: Int): Int - - /** - * Create a Netty client actor - */ - def connect(hostPort: HostPort): ActorRef - - /** - * Close resource for this context - */ - def close() -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala b/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala deleted file mode 100644 index a62eff5..0000000 --- a/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.transport.netty - -import com.typesafe.config.Config - -import io.gearpump.util.Constants - -class NettyConfig(conf: Config) { - - val buffer_size = conf.getInt(Constants.NETTY_BUFFER_SIZE) - val max_retries = conf.getInt(Constants.NETTY_MAX_RETRIES) - val base_sleep_ms = conf.getInt(Constants.NETTY_BASE_SLEEP_MS) - val max_sleep_ms = conf.getInt(Constants.NETTY_MAX_SLEEP_MS) - val messageBatchSize = conf.getInt(Constants.NETTY_MESSAGE_BATCH_SIZE) - val flushCheckInterval = conf.getInt(Constants.NETTY_FLUSH_CHECK_INTERVAL) - - def newTransportSerializer: ITransportMessageSerializer = { - Class.forName( - conf.getString(Constants.GEARPUMP_TRANSPORT_SERIALIZER)) - .newInstance().asInstanceOf[ITransportMessageSerializer] - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala b/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala deleted file mode 100644 index 3e746af..0000000 --- a/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.transport.netty - -import java.net.InetSocketAddress -import java.util.concurrent.{Executors, ThreadFactory} - -import org.jboss.netty.bootstrap.{ClientBootstrap, ServerBootstrap} -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory -import org.jboss.netty.channel.{Channel, ChannelFactory, ChannelPipelineFactory} - -object NettyUtil { - - def newNettyServer( - name: String, - pipelineFactory: ChannelPipelineFactory, - buffer_size: Int, - inputPort: Int = 0): (Int, Channel) = { - val bossFactory: ThreadFactory = new NettyRenameThreadFactory(name + "-boss") - val workerFactory: ThreadFactory = new NettyRenameThreadFactory(name + "-worker") - val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory), 1) - - val bootstrap = createServerBootStrap(factory, pipelineFactory, buffer_size) - val channel: Channel = bootstrap.bind(new InetSocketAddress(inputPort)) - val port = channel.getLocalAddress().asInstanceOf[InetSocketAddress].getPort() - (port, channel) - } - - def createServerBootStrap( - factory: ChannelFactory, pipelineFactory: ChannelPipelineFactory, buffer_size: Int) - : ServerBootstrap = { - val bootstrap = new ServerBootstrap(factory) - bootstrap.setOption("child.tcpNoDelay", true) - bootstrap.setOption("child.receiveBufferSize", buffer_size) - bootstrap.setOption("child.keepAlive", true) - bootstrap.setPipelineFactory(pipelineFactory) - bootstrap - } - - def createClientBootStrap( - factory: ChannelFactory, pipelineFactory: ChannelPipelineFactory, buffer_size: Int) - : ClientBootstrap = { - val bootstrap = new ClientBootstrap(factory) - bootstrap.setOption("tcpNoDelay", true) - bootstrap.setOption("sendBufferSize", buffer_size) - bootstrap.setOption("keepAlive", true) - bootstrap.setPipelineFactory(pipelineFactory) - bootstrap - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/transport/netty/Server.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/transport/netty/Server.scala b/core/src/main/scala/io/gearpump/transport/netty/Server.scala deleted file mode 100644 index 9a9d79b..0000000 --- a/core/src/main/scala/io/gearpump/transport/netty/Server.scala +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.transport.netty - -import java.util -import scala.collection.JavaConverters._ -import scala.collection.immutable.IntMap -import scala.concurrent.Future - -import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem} -import org.jboss.netty.channel._ -import org.jboss.netty.channel.group.{ChannelGroup, DefaultChannelGroup} -import org.slf4j.Logger - -import io.gearpump.transport.ActorLookupById -import io.gearpump.util.{AkkaHelper, LogUtil} - -/** Netty server actor, message received will be forward to the target on the address line. */ -class Server( - name: String, conf: NettyConfig, lookupActor: ActorLookupById, deserializeFlag: Boolean) - extends Actor { - - private[netty] final val LOG: Logger = LogUtil.getLogger(getClass, context = name) - import io.gearpump.transport.netty.Server._ - - val allChannels: ChannelGroup = new DefaultChannelGroup("gearpump-server") - - val system = context.system.asInstanceOf[ExtendedActorSystem] - - def receive: Receive = msgHandler orElse channelManager - // As we will only transfer TaskId on the wire, - // this object will translate taskId to or from ActorRef - private val taskIdActorRefTranslation = new TaskIdActorRefTranslation(context) - - def channelManager: Receive = { - case AddChannel(channel) => allChannels.add(channel) - case CloseChannel(channel) => - import context.dispatcher - Future { - channel.close.awaitUninterruptibly - allChannels.remove(channel) - } - } - - def msgHandler: Receive = { - case MsgBatch(msgs) => - msgs.asScala.groupBy(_.targetTask()).foreach { taskBatch => - val (taskId, taskMessages) = taskBatch - val actor = lookupActor.lookupLocalActor(taskId) - - if (actor.isEmpty) { - LOG.error(s"Cannot find actor for id: $taskId...") - } else taskMessages.foreach { taskMessage => - actor.get.tell(taskMessage.message(), - taskIdActorRefTranslation.translateToActorRef(taskMessage.sessionId())) - } - } - } - - override def postStop(): Unit = { - allChannels.close.awaitUninterruptibly - } -} - -object Server { - - class ServerPipelineFactory(server: ActorRef, conf: NettyConfig) extends ChannelPipelineFactory { - def getPipeline: ChannelPipeline = { - val pipeline: ChannelPipeline = Channels.pipeline - pipeline.addLast("decoder", new MessageDecoder(conf.newTransportSerializer)) - pipeline.addLast("encoder", new MessageEncoder) - pipeline.addLast("handler", new ServerHandler(server)) - pipeline - } - } - - class ServerHandler(server: ActorRef) extends SimpleChannelUpstreamHandler { - private[netty] final val LOG: Logger = LogUtil.getLogger(getClass, context = server.path.name) - - override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) { - server ! AddChannel(e.getChannel) - } - - override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) { - val msgs: util.List[TaskMessage] = e.getMessage.asInstanceOf[util.List[TaskMessage]] - if (msgs != null) { - server ! MsgBatch(msgs) - } - } - - override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) { - LOG.error("server errors in handling the request", e.getCause) - server ! CloseChannel(e.getChannel) - } - } - - class TaskIdActorRefTranslation(context: ActorContext) { - private var taskIdtoActorRef = IntMap.empty[ActorRef] - - /** 1-1 mapping from session id to fake ActorRef */ - def translateToActorRef(sessionId: Int): ActorRef = { - if (!taskIdtoActorRef.contains(sessionId)) { - - // A fake ActorRef for performance optimization. - val actorRef = AkkaHelper.actorFor(context.system, s"/session#$sessionId") - taskIdtoActorRef += sessionId -> actorRef - } - taskIdtoActorRef.get(sessionId).get - } - } - - case class AddChannel(channel: Channel) - - case class CloseChannel(channel: Channel) - - case class MsgBatch(messages: java.lang.Iterable[TaskMessage]) - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala b/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala deleted file mode 100644 index 25a34d9..0000000 --- a/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import java.util.concurrent.{TimeUnit, TimeoutException} -import scala.concurrent.Await -import scala.concurrent.duration.Duration -import scala.util.{Failure, Success, Try} - -import akka.actor._ -import com.typesafe.config.Config -import org.slf4j.Logger - -import io.gearpump.cluster.ClusterConfig -import io.gearpump.util.LogUtil.ProcessType - -/** - * ActorSystemBooter start a new JVM process to boot an actor system. - * All executors are started by ActorSystemBooter - * - * It send the system address to "report back actor" - */ -class ActorSystemBooter(config: Config) { - import io.gearpump.util.ActorSystemBooter._ - - def boot(name: String, reportBackActor: String): ActorSystem = { - val system = ActorSystem(name, config) - // Daemon path: http://{system}@{ip}:{port}/daemon - system.actorOf(Props(classOf[Daemon], name, reportBackActor), "daemon") - system - } -} - -object ActorSystemBooter { - - def apply(config: Config): ActorSystemBooter = new ActorSystemBooter(config) - - def main(args: Array[String]) { - val name = args(0) - val reportBack = args(1) - val config = ClusterConfig.default() - - LogUtil.loadConfiguration(config, ProcessType.APPLICATION) - - val debugPort = Option(System.getProperty(Constants.GEARPUMP_REMOTE_DEBUG_PORT)) - debugPort.foreach { port => - val LOG: Logger = LogUtil.getLogger(ActorSystemBooter.getClass) - LOG.info("==========================================") - LOG.info("Remote debug port: " + port) - LOG.info("==========================================") - } - - val system = apply(config).boot(name, reportBack) - - Runtime.getRuntime().addShutdownHook(new Thread() { - override def run(): Unit = { - val LOG: Logger = LogUtil.getLogger(ActorSystemBooter.getClass) - LOG.info("Maybe we have received a SIGINT signal from parent process, " + - "start to cleanup resources....") - system.terminate() - } - }) - - Await.result(system.whenTerminated, Duration.Inf) - } - - case class BindLifeCycle(actor: ActorRef) - case class CreateActor(prop: Props, name: String) - case class ActorCreated(actor: ActorRef, name: String) - case class CreateActorFailed(name: String, reason: Throwable) - - case class RegisterActorSystem(systemPath: String) - - /** - * This actor system will watch for parent, - * If parent dies, this will also die - */ - case class ActorSystemRegistered(bindLifeWith: ActorRef) - case class RegisterActorSystemFailed(reason: Throwable) - - object RegisterActorSystemTimeOut - - class Daemon(val name: String, reportBack: String) extends Actor { - val LOG: Logger = LogUtil.getLogger(getClass, context = name) - - val username = Option(System.getProperty(Constants.GEARPUMP_USERNAME)).getOrElse("not_defined") - LOG.info(s"RegisterActorSystem to ${reportBack}, current user: $username") - - val reportBackActor = context.actorSelection(reportBack) - reportBackActor ! RegisterActorSystem(ActorUtil.getSystemAddress(context.system).toString) - - implicit val executionContext = context.dispatcher - val timeout = context.system.scheduler.scheduleOnce(Duration(25, TimeUnit.SECONDS), - self, RegisterActorSystemFailed(new TimeoutException)) - - context.become(waitForRegisterResult) - - def receive: Receive = null - - def waitForRegisterResult: Receive = { - case ActorSystemRegistered(parent) => - timeout.cancel() - context.watch(parent) - context.become(waitCommand) - case RegisterActorSystemFailed(ex) => - LOG.error("RegisterActorSystemFailed", ex) - timeout.cancel() - context.stop(self) - } - - def waitCommand: Receive = { - case BindLifeCycle(actor) => - LOG.info(s"ActorSystem $name Binding life cycle with actor: $actor") - context.watch(actor) - case create@CreateActor(props: Props, name: String) => - LOG.info(s"creating actor $name") - val actor = Try(context.actorOf(props, name)) - actor match { - case Success(actor) => - sender ! ActorCreated(actor, name) - case Failure(e) => - sender ! CreateActorFailed(props.clazz.getName, e) - } - case PoisonPill => - context.stop(self) - case Terminated(actor) => - LOG.info(s"System $name Watched actor is terminated $actor") - context.stop(self) - } - - override def postStop(): Unit = { - LOG.info(s"ActorSystem $name is shutting down...") - context.system.terminate() - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/ActorUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/ActorUtil.scala b/core/src/main/scala/io/gearpump/util/ActorUtil.scala deleted file mode 100644 index d5f48a7..0000000 --- a/core/src/main/scala/io/gearpump/util/ActorUtil.scala +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import scala.concurrent.{ExecutionContext, Future} - -import akka.actor.Actor.Receive -import akka.actor._ -import akka.pattern.ask -import org.slf4j.Logger - -import io.gearpump.cluster.AppMasterToMaster.GetAllWorkers -import io.gearpump.cluster.ClientToMaster.{ResolveAppId, ResolveWorkerId} -import io.gearpump.cluster.MasterToAppMaster.WorkerList -import io.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ResolveWorkerIdResult} -import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, StartExecutorSystems} -import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest} -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.transport.HostPort - -object ActorUtil { - private val LOG: Logger = LogUtil.getLogger(getClass) - - def getSystemAddress(system: ActorSystem): Address = { - system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress - } - - def getFullPath(system: ActorSystem, path: ActorPath): String = { - path.toStringWithAddress(getSystemAddress(system)) - } - - def getHostname(actor: ActorRef): String = { - val path = actor.path - path.address.host.getOrElse("localhost") - } - - def defaultMsgHandler(actor: ActorRef): Receive = { - case msg: Any => - LOG.error(s"Cannot find a matching message, ${msg.getClass.toString}, forwarded from $actor") - } - - def printActorSystemTree(system: ActorSystem): Unit = { - val extendedSystem = system.asInstanceOf[ExtendedActorSystem] - val clazz = system.getClass - val m = clazz.getDeclaredMethod("printTree") - m.setAccessible(true) - LOG.info(m.invoke(system).asInstanceOf[String]) - } - - /** Checks whether a actor is child actor by simply examining name */ - // TODO: fix this, we should also check the path to root besides name - def isChildActorPath(parent: ActorRef, child: ActorRef): Boolean = { - if (null != child) { - parent.path.name == child.path.parent.name - } else { - false - } - } - - def actorNameForExecutor(appId: Int, executorId: Int): String = "app" + appId + "-executor" + - executorId - - // TODO: Currently we explicitly require the master contacts to be started with this path pattern - // akka.tcp://$MASTER@${master.host}:${master.port}/user/$MASTER - def getMasterActorPath(master: HostPort): ActorPath = { - import io.gearpump.util.Constants.MASTER - ActorPath.fromString(s"akka.tcp://$MASTER@${master.host}:${master.port}/user/$MASTER") - } - - def launchExecutorOnEachWorker(master: ActorRef, executorJvmConfig: ExecutorSystemJvmConfig, - sender: ActorRef)(implicit executor: scala.concurrent.ExecutionContext): Unit = { - implicit val timeout = Constants.FUTURE_TIMEOUT - - (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]].map { list => - val resources = list.workers.map { - workerId => ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER) - }.toArray - - master.tell(StartExecutorSystems(resources, executorJvmConfig), sender) - } - } - - def askAppMaster[T](master: ActorRef, appId: Int, msg: Any)(implicit ex: ExecutionContext) - : Future[T] = { - implicit val timeout = Constants.FUTURE_TIMEOUT - val appmaster = askActor[ResolveAppIdResult](master, ResolveAppId(appId)).flatMap { result => - if (result.appMaster.isSuccess) { - Future.successful(result.appMaster.get) - } else { - Future.failed(result.appMaster.failed.get) - } - } - appmaster.flatMap(askActor[T](_, msg)) - } - - def askWorker[T](master: ActorRef, workerId: WorkerId, msg: Any)(implicit ex: ExecutionContext) - : Future[T] = { - implicit val timeout = Constants.FUTURE_TIMEOUT - val worker = askActor[ResolveWorkerIdResult](master, ResolveWorkerId(workerId)) - .flatMap { result => - if (result.worker.isSuccess) { - Future.successful(result.worker.get) - } else { - Future.failed(result.worker.failed.get) - } - } - worker.flatMap(askActor[T](_, msg)) - } - - def askActor[T](actor: ActorRef, msg: Any)(implicit ex: ExecutionContext): Future[T] = { - implicit val timeout = Constants.FUTURE_TIMEOUT - (actor ? msg).asInstanceOf[Future[T]] - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/AkkaApp.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/AkkaApp.scala b/core/src/main/scala/io/gearpump/util/AkkaApp.scala deleted file mode 100644 index 2b0bf61..0000000 --- a/core/src/main/scala/io/gearpump/util/AkkaApp.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import scala.util.Try - -import io.gearpump.cluster.ClusterConfig - -/** - * A Main class helper to load Akka configuration automatically. - */ -trait AkkaApp { - - type Config = com.typesafe.config.Config - - def main(akkaConf: Config, args: Array[String]): Unit - - def help(): Unit - - protected def akkaConfig: Config = { - ClusterConfig.default() - } - - def main(args: Array[String]): Unit = { - Try { - main(akkaConfig, args) - }.failed.foreach { ex => help(); throw ex } - } -}
