http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/netty/Context.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/transport/netty/Context.scala b/core/src/main/scala/io/gearpump/transport/netty/Context.scala index fac190e..9a9ee29 100644 --- a/core/src/main/scala/io/gearpump/transport/netty/Context.scala +++ b/core/src/main/scala/io/gearpump/transport/netty/Context.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,30 +20,26 @@ package io.gearpump.transport.netty import java.io.Closeable import java.util.concurrent._ +import scala.collection.JavaConverters._ import akka.actor.{ActorRef, ActorSystem, Props} import com.typesafe.config.Config -import io.gearpump.transport.netty.Server.ServerPipelineFactory -import io.gearpump.transport.{ActorLookupById, HostPort} -import io.gearpump.util.{Constants, LogUtil} import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory import org.slf4j.Logger -import scala.collection.JavaConversions._ -import scala.language.implicitConversions +import io.gearpump.transport.netty.Server.ServerPipelineFactory +import io.gearpump.transport.{ActorLookupById, HostPort} +import io.gearpump.util.{Constants, LogUtil} object Context { private final val LOG: Logger = LogUtil.getLogger(getClass) - - implicit def toCloseable(fun : () => Any) = new Closeable { - override def close = fun() - } } -class Context(system : ActorSystem, conf: NettyConfig) extends IContext { -import io.gearpump.transport.netty.Context._ +/** Netty Context */ +class Context(system: ActorSystem, conf: NettyConfig) extends IContext { + import io.gearpump.transport.netty.Context._ - def this(system : ActorSystem, conf : Config) { + def this(system: ActorSystem, conf: Config) { this(system, new NettyConfig(conf)) } @@ -54,52 +50,64 @@ import io.gearpump.transport.netty.Context._ private lazy val clientChannelFactory: NioClientSocketChannelFactory = { val bossFactory: ThreadFactory = new NettyRenameThreadFactory("client" + "-boss") val workerFactory: ThreadFactory = new NettyRenameThreadFactory("client" + "-worker") - val channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory), maxWorkers) - closeHandler.add { ()=> - - LOG.info("Closing all client resources....") - channelFactory.releaseExternalResources - } + val channelFactory = + new NioClientSocketChannelFactory( + Executors.newCachedThreadPool(bossFactory), + Executors.newCachedThreadPool(workerFactory), maxWorkers) + + closeHandler.add(new Closeable { + override def close(): Unit = { + LOG.info("Closing all client resources....") + channelFactory.releaseExternalResources + } + }) channelFactory } - - def bind(name: String, lookupActor : ActorLookupById, deserializeFlag : Boolean = true, inputPort: Int = 0): Int = { - //TODO: whether we should expose it as application config? - val server = system.actorOf(Props(classOf[Server], name, conf, lookupActor, deserializeFlag).withDispatcher(nettyDispatcher), name) + def bind( + name: String, lookupActor : ActorLookupById, deserializeFlag : Boolean = true, + inputPort: Int = 0): Int = { + // TODO: whether we should expose it as application config? + val server = system.actorOf(Props(classOf[Server], name, conf, lookupActor, + deserializeFlag).withDispatcher(nettyDispatcher), name) val (port, channel) = NettyUtil.newNettyServer(name, new ServerPipelineFactory(server, conf), 5242880, inputPort) val factory = channel.getFactory - closeHandler.add{ () => + closeHandler.add(new Closeable { + override def close(): Unit = { system.stop(server) channel.close() - LOG.info("Closing all server resources....") factory.releaseExternalResources } + }) port } - def connect(hostPort : HostPort) : ActorRef = { - val client = system.actorOf(Props(classOf[Client], conf, clientChannelFactory, hostPort).withDispatcher(nettyDispatcher)) - closeHandler.add { () => + def connect(hostPort: HostPort): ActorRef = { + val client = system.actorOf(Props(classOf[Client], conf, clientChannelFactory, hostPort) + .withDispatcher(nettyDispatcher)) + closeHandler.add(new Closeable { + override def close(): Unit = { + LOG.info("closing Client actor....") + system.stop(client) + } + }) - LOG.info("closing Client actor....") - system.stop(client) - } client } /** * terminate this context */ - def close { + def close(): Unit = { - LOG.info(s"Context.term, cleanup resources...., we have ${closeHandler.size()} items to close...") + LOG.info(s"Context.term, cleanup resources...., " + + s"we have ${closeHandler.size()} items to close...") - // clean up resource in reverse order so that client actor can be cleaned + // Cleans up resource in reverse order so that client actor can be cleaned // before clientChannelFactory - closeHandler.iterator().toArray.reverse.foreach(_.close()) + closeHandler.iterator().asScala.toList.reverse.foreach(_.close()) } }
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/netty/IContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/transport/netty/IContext.scala b/core/src/main/scala/io/gearpump/transport/netty/IContext.scala index e1a5141..56b2f7c 100644 --- a/core/src/main/scala/io/gearpump/transport/netty/IContext.scala +++ b/core/src/main/scala/io/gearpump/transport/netty/IContext.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,24 +19,23 @@ package io.gearpump.transport.netty import akka.actor.ActorRef -import io.gearpump.transport.{HostPort, ActorLookupById} + import io.gearpump.transport.{ActorLookupById, HostPort} trait IContext { /** - * TODO: remove deserializeFlag from interface + * Create a Netty server connection. */ - def bind(name: String, lookupActor : ActorLookupById, deserializeFlag : Boolean, port: Int) : Int + def bind(name: String, lookupActor: ActorLookupById, deserializeFlag: Boolean, port: Int): Int /** - * connect to a remote host - * return a ActorRef which you can send message TaskMessage to.. + * Create a Netty client actor */ def connect(hostPort: HostPort): ActorRef /** * Close resource for this context */ - def close + def close() } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala b/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala index ab76d9f..a62eff5 100644 --- a/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala +++ b/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,6 +19,7 @@ package io.gearpump.transport.netty import com.typesafe.config.Config + import io.gearpump.util.Constants class NettyConfig(conf: Config) { @@ -30,6 +31,9 @@ class NettyConfig(conf: Config) { val messageBatchSize = conf.getInt(Constants.NETTY_MESSAGE_BATCH_SIZE) val flushCheckInterval = conf.getInt(Constants.NETTY_FLUSH_CHECK_INTERVAL) - def newTransportSerializer = Class.forName(conf.getString(Constants.GEARPUMP_TRANSPORT_SERIALIZER)). - newInstance().asInstanceOf[ITransportMessageSerializer] + def newTransportSerializer: ITransportMessageSerializer = { + Class.forName( + conf.getString(Constants.GEARPUMP_TRANSPORT_SERIALIZER)) + .newInstance().asInstanceOf[ITransportMessageSerializer] + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala b/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala index 40dc9f0..3e746af 100644 --- a/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala +++ b/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -27,18 +27,25 @@ import org.jboss.netty.channel.{Channel, ChannelFactory, ChannelPipelineFactory} object NettyUtil { - def newNettyServer(name: String, pipelineFactory: ChannelPipelineFactory, buffer_size: Int, inputPort: Int = 0): (Int, Channel) = { + def newNettyServer( + name: String, + pipelineFactory: ChannelPipelineFactory, + buffer_size: Int, + inputPort: Int = 0): (Int, Channel) = { val bossFactory: ThreadFactory = new NettyRenameThreadFactory(name + "-boss") val workerFactory: ThreadFactory = new NettyRenameThreadFactory(name + "-worker") - val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory), 1) + val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), + Executors.newCachedThreadPool(workerFactory), 1) val bootstrap = createServerBootStrap(factory, pipelineFactory, buffer_size) val channel: Channel = bootstrap.bind(new InetSocketAddress(inputPort)) - val port = channel.getLocalAddress().asInstanceOf[InetSocketAddress].getPort(); + val port = channel.getLocalAddress().asInstanceOf[InetSocketAddress].getPort() (port, channel) } - def createServerBootStrap(factory: ChannelFactory, pipelineFactory: ChannelPipelineFactory, buffer_size: Int) = { + def createServerBootStrap( + factory: ChannelFactory, pipelineFactory: ChannelPipelineFactory, buffer_size: Int) + : ServerBootstrap = { val bootstrap = new ServerBootstrap(factory) bootstrap.setOption("child.tcpNoDelay", true) bootstrap.setOption("child.receiveBufferSize", buffer_size) @@ -47,7 +54,9 @@ object NettyUtil { bootstrap } - def createClientBootStrap(factory: ChannelFactory, pipelineFactory: ChannelPipelineFactory, buffer_size: Int) = { + def createClientBootStrap( + factory: ChannelFactory, pipelineFactory: ChannelPipelineFactory, buffer_size: Int) + : ClientBootstrap = { val bootstrap = new ClientBootstrap(factory) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("sendBufferSize", buffer_size) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/netty/Server.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/transport/netty/Server.scala b/core/src/main/scala/io/gearpump/transport/netty/Server.scala index dde0861..9a9d79b 100644 --- a/core/src/main/scala/io/gearpump/transport/netty/Server.scala +++ b/core/src/main/scala/io/gearpump/transport/netty/Server.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,32 +19,36 @@ package io.gearpump.transport.netty import java.util +import scala.collection.JavaConverters._ +import scala.collection.immutable.IntMap +import scala.concurrent.Future import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem} -import io.gearpump.transport.ActorLookupById -import io.gearpump.util.{LogUtil, AkkaHelper} import org.jboss.netty.channel._ import org.jboss.netty.channel.group.{ChannelGroup, DefaultChannelGroup} import org.slf4j.Logger -import scala.collection.JavaConversions._ -import scala.collection.immutable.{IntMap, LongMap} -import scala.concurrent.Future +import io.gearpump.transport.ActorLookupById +import io.gearpump.util.{AkkaHelper, LogUtil} -class Server(name: String, conf: NettyConfig, lookupActor : ActorLookupById, deserializeFlag : Boolean) extends Actor { - private[netty] final val LOG: Logger = LogUtil.getLogger(getClass, context = name) +/** Netty server actor, message received will be forward to the target on the address line. */ +class Server( + name: String, conf: NettyConfig, lookupActor: ActorLookupById, deserializeFlag: Boolean) + extends Actor { + private[netty] final val LOG: Logger = LogUtil.getLogger(getClass, context = name) import io.gearpump.transport.netty.Server._ val allChannels: ChannelGroup = new DefaultChannelGroup("gearpump-server") val system = context.system.asInstanceOf[ExtendedActorSystem] - def receive = msgHandler orElse channelManager - //As we will only transfer TaskId on the wire, this object will translate taskId to or from ActorRef + def receive: Receive = msgHandler orElse channelManager + // As we will only transfer TaskId on the wire, + // this object will translate taskId to or from ActorRef private val taskIdActorRefTranslation = new TaskIdActorRefTranslation(context) - def channelManager : Receive = { + def channelManager: Receive = { case AddChannel(channel) => allChannels.add(channel) case CloseChannel(channel) => import context.dispatcher @@ -54,9 +58,9 @@ class Server(name: String, conf: NettyConfig, lookupActor : ActorLookupById, des } } - def msgHandler : Receive = { + def msgHandler: Receive = { case MsgBatch(msgs) => - msgs.groupBy(_.targetTask()).foreach { taskBatch => + msgs.asScala.groupBy(_.targetTask()).foreach { taskBatch => val (taskId, taskMessages) = taskBatch val actor = lookupActor.lookupLocalActor(taskId) @@ -69,18 +73,13 @@ class Server(name: String, conf: NettyConfig, lookupActor : ActorLookupById, des } } - override def postStop() = { + override def postStop(): Unit = { allChannels.close.awaitUninterruptibly } } object Server { - // Create a 1-1 mapping fake ActorRef for task - // The path is fake, don't use the ActorRef directly. - // As we must use actorFor() which is deprecated, - // according to the advice https://issues.scala-lang.org/browse/SI-7934, - // use a helper object to bypass this deprecation warning. class ServerPipelineFactory(server: ActorRef, conf: NettyConfig) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { val pipeline: ChannelPipeline = Channels.pipeline @@ -114,8 +113,9 @@ object Server { class TaskIdActorRefTranslation(context: ActorContext) { private var taskIdtoActorRef = IntMap.empty[ActorRef] - def translateToActorRef(sessionId : Int): ActorRef = { - if(!taskIdtoActorRef.contains(sessionId)){ + /** 1-1 mapping from session id to fake ActorRef */ + def translateToActorRef(sessionId: Int): ActorRef = { + if (!taskIdtoActorRef.contains(sessionId)) { // A fake ActorRef for performance optimization. val actorRef = AkkaHelper.actorFor(context.system, s"/session#$sessionId") @@ -123,13 +123,12 @@ object Server { } taskIdtoActorRef.get(sessionId).get } - } case class AddChannel(channel: Channel) case class CloseChannel(channel: Channel) - case class MsgBatch(messages: Iterable[TaskMessage]) + case class MsgBatch(messages: java.lang.Iterable[TaskMessage]) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala b/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala index 71fda90..25a34d9 100644 --- a/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala +++ b/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,16 +18,17 @@ package io.gearpump.util -import java.util.concurrent.{TimeoutException, TimeUnit} +import java.util.concurrent.{TimeUnit, TimeoutException} +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.util.{Failure, Success, Try} import akka.actor._ import com.typesafe.config.Config -import io.gearpump.cluster.ClusterConfig -import io.gearpump.util.LogUtil.ProcessType import org.slf4j.Logger -import scala.concurrent.duration.Duration -import scala.util.{Failure, Success, Try} +import io.gearpump.cluster.ClusterConfig +import io.gearpump.util.LogUtil.ProcessType /** * ActorSystemBooter start a new JVM process to boot an actor system. @@ -35,23 +36,22 @@ import scala.util.{Failure, Success, Try} * * It send the system address to "report back actor" */ - -class ActorSystemBooter(config : Config) { +class ActorSystemBooter(config: Config) { import io.gearpump.util.ActorSystemBooter._ - def boot(name : String, reportBackActor : String) : ActorSystem = { + def boot(name: String, reportBackActor: String): ActorSystem = { val system = ActorSystem(name, config) - // daemon path: http://{system}@{ip}:{port}/daemon + // Daemon path: http://{system}@{ip}:{port}/daemon system.actorOf(Props(classOf[Daemon], name, reportBackActor), "daemon") system } } -object ActorSystemBooter { +object ActorSystemBooter { - def apply(config : Config) : ActorSystemBooter = new ActorSystemBooter(config) + def apply(config: Config): ActorSystemBooter = new ActorSystemBooter(config) - def main (args: Array[String]) { + def main(args: Array[String]) { val name = args(0) val reportBack = args(1) val config = ClusterConfig.default() @@ -59,7 +59,7 @@ object ActorSystemBooter { LogUtil.loadConfiguration(config, ProcessType.APPLICATION) val debugPort = Option(System.getProperty(Constants.GEARPUMP_REMOTE_DEBUG_PORT)) - debugPort.foreach{ port => + debugPort.foreach { port => val LOG: Logger = LogUtil.getLogger(ActorSystemBooter.getClass) LOG.info("==========================================") LOG.info("Remote debug port: " + port) @@ -69,22 +69,23 @@ object ActorSystemBooter { val system = apply(config).boot(name, reportBack) Runtime.getRuntime().addShutdownHook(new Thread() { - override def run() : Unit = { + override def run(): Unit = { val LOG: Logger = LogUtil.getLogger(ActorSystemBooter.getClass) - LOG.info("Maybe we have received a SIGINT signal from parent process, start to cleanup resources....") - system.shutdown() + LOG.info("Maybe we have received a SIGINT signal from parent process, " + + "start to cleanup resources....") + system.terminate() } - }); + }) - system.awaitTermination() + Await.result(system.whenTerminated, Duration.Inf) } - case class BindLifeCycle(actor : ActorRef) - case class CreateActor(prop : Props, name : String) - case class ActorCreated(actor : ActorRef, name : String) - case class CreateActorFailed(name : String, reason: Throwable) + case class BindLifeCycle(actor: ActorRef) + case class CreateActor(prop: Props, name: String) + case class ActorCreated(actor: ActorRef, name: String) + case class CreateActorFailed(name: String, reason: Throwable) - case class RegisterActorSystem(systemPath : String) + case class RegisterActorSystem(systemPath: String) /** * This actor system will watch for parent, @@ -95,23 +96,24 @@ object ActorSystemBooter { object RegisterActorSystemTimeOut - class Daemon(val name : String, reportBack : String) extends Actor { + class Daemon(val name: String, reportBack: String) extends Actor { val LOG: Logger = LogUtil.getLogger(getClass, context = name) val username = Option(System.getProperty(Constants.GEARPUMP_USERNAME)).getOrElse("not_defined") LOG.info(s"RegisterActorSystem to ${reportBack}, current user: $username") - + val reportBackActor = context.actorSelection(reportBack) reportBackActor ! RegisterActorSystem(ActorUtil.getSystemAddress(context.system).toString) implicit val executionContext = context.dispatcher - val timeout = context.system.scheduler.scheduleOnce(Duration(25, TimeUnit.SECONDS), self, RegisterActorSystemFailed(new TimeoutException)) + val timeout = context.system.scheduler.scheduleOnce(Duration(25, TimeUnit.SECONDS), + self, RegisterActorSystemFailed(new TimeoutException)) context.become(waitForRegisterResult) - def receive : Receive = null + def receive: Receive = null - def waitForRegisterResult : Receive = { + def waitForRegisterResult: Receive = { case ActorSystemRegistered(parent) => timeout.cancel() context.watch(parent) @@ -122,11 +124,11 @@ object ActorSystemBooter { context.stop(self) } - def waitCommand : Receive = { + def waitCommand: Receive = { case BindLifeCycle(actor) => LOG.info(s"ActorSystem $name Binding life cycle with actor: $actor") context.watch(actor) - case create @ CreateActor(props : Props, name : String) => + case create@CreateActor(props: Props, name: String) => LOG.info(s"creating actor $name") val actor = Try(context.actorOf(props, name)) actor match { @@ -142,9 +144,9 @@ object ActorSystemBooter { context.stop(self) } - override def postStop : Unit = { + override def postStop(): Unit = { LOG.info(s"ActorSystem $name is shutting down...") - context.system.shutdown() + context.system.terminate() } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/ActorUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/ActorUtil.scala b/core/src/main/scala/io/gearpump/util/ActorUtil.scala index cc701d8..d5f48a7 100644 --- a/core/src/main/scala/io/gearpump/util/ActorUtil.scala +++ b/core/src/main/scala/io/gearpump/util/ActorUtil.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,43 +18,44 @@ package io.gearpump.util +import scala.concurrent.{ExecutionContext, Future} + import akka.actor.Actor.Receive import akka.actor._ import akka.pattern.ask -import io.gearpump.WorkerId +import org.slf4j.Logger + import io.gearpump.cluster.AppMasterToMaster.GetAllWorkers import io.gearpump.cluster.ClientToMaster.{ResolveAppId, ResolveWorkerId} import io.gearpump.cluster.MasterToAppMaster.WorkerList import io.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ResolveWorkerIdResult} import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, StartExecutorSystems} import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest} +import io.gearpump.cluster.worker.WorkerId import io.gearpump.transport.HostPort -import org.slf4j.Logger - -import scala.concurrent.{ExecutionContext, Future} object ActorUtil { - private val LOG: Logger = LogUtil.getLogger(getClass) + private val LOG: Logger = LogUtil.getLogger(getClass) - def getSystemAddress(system : ActorSystem) : Address = { + def getSystemAddress(system: ActorSystem): Address = { system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress } - def getFullPath(system : ActorSystem, path : ActorPath): String = { + def getFullPath(system: ActorSystem, path: ActorPath): String = { path.toStringWithAddress(getSystemAddress(system)) } - def getHostname(actor : ActorRef): String = { + def getHostname(actor: ActorRef): String = { val path = actor.path path.address.host.getOrElse("localhost") } - def defaultMsgHandler(actor : ActorRef) : Receive = { - case msg : Any => - LOG.error(s"Cannot find a matching message, ${msg.getClass.toString}, forwarded from $actor") + def defaultMsgHandler(actor: ActorRef): Receive = { + case msg: Any => + LOG.error(s"Cannot find a matching message, ${msg.getClass.toString}, forwarded from $actor") } - def printActorSystemTree(system : ActorSystem) : Unit = { + def printActorSystemTree(system: ActorSystem): Unit = { val extendedSystem = system.asInstanceOf[ExtendedActorSystem] val clazz = system.getClass val m = clazz.getDeclaredMethod("printTree") @@ -62,9 +63,9 @@ object ActorUtil { LOG.info(m.invoke(system).asInstanceOf[String]) } - // Check whether a actor is child actor by simply examining name - //TODO: fix this, we should also check the path to root besides name - def isChildActorPath(parent : ActorRef, child : ActorRef) : Boolean = { + /** Checks whether a actor is child actor by simply examining name */ + // TODO: fix this, we should also check the path to root besides name + def isChildActorPath(parent: ActorRef, child: ActorRef): Boolean = { if (null != child) { parent.path.name == child.path.parent.name } else { @@ -72,21 +73,18 @@ object ActorUtil { } } - def actorNameForExecutor(appId : Int, executorId : Int) = "app" + appId + "-executor" + executorId + def actorNameForExecutor(appId: Int, executorId: Int): String = "app" + appId + "-executor" + + executorId - /** - * TODO: - * Currently we explicitly require the master contacts to be started with this path pattern - * 'akka.tcp://$MASTER@${master.host}:${master.port}/user/$MASTER' - * - */ + // TODO: Currently we explicitly require the master contacts to be started with this path pattern + // akka.tcp://$MASTER@${master.host}:${master.port}/user/$MASTER def getMasterActorPath(master: HostPort): ActorPath = { - import Constants.MASTER + import io.gearpump.util.Constants.MASTER ActorPath.fromString(s"akka.tcp://$MASTER@${master.host}:${master.port}/user/$MASTER") } def launchExecutorOnEachWorker(master: ActorRef, executorJvmConfig: ExecutorSystemJvmConfig, - sender: ActorRef)(implicit executor : scala.concurrent.ExecutionContext) = { + sender: ActorRef)(implicit executor: scala.concurrent.ExecutionContext): Unit = { implicit val timeout = Constants.FUTURE_TIMEOUT (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]].map { list => @@ -98,10 +96,10 @@ object ActorUtil { } } - - def askAppMaster[T](master: ActorRef, appId: Int, msg: Any)(implicit ex: ExecutionContext): Future[T] = { + def askAppMaster[T](master: ActorRef, appId: Int, msg: Any)(implicit ex: ExecutionContext) + : Future[T] = { implicit val timeout = Constants.FUTURE_TIMEOUT - val appmaster = askActor[ResolveAppIdResult](master, ResolveAppId(appId)).flatMap { result => + val appmaster = askActor[ResolveAppIdResult](master, ResolveAppId(appId)).flatMap { result => if (result.appMaster.isSuccess) { Future.successful(result.appMaster.get) } else { @@ -111,15 +109,17 @@ object ActorUtil { appmaster.flatMap(askActor[T](_, msg)) } - def askWorker[T](master: ActorRef, workerId: WorkerId, msg: Any)(implicit ex: ExecutionContext): Future[T] = { + def askWorker[T](master: ActorRef, workerId: WorkerId, msg: Any)(implicit ex: ExecutionContext) + : Future[T] = { implicit val timeout = Constants.FUTURE_TIMEOUT - val worker = askActor[ResolveWorkerIdResult](master, ResolveWorkerId(workerId)).flatMap { result => - if (result.worker.isSuccess) { - Future.successful(result.worker.get) - } else { - Future.failed(result.worker.failed.get) + val worker = askActor[ResolveWorkerIdResult](master, ResolveWorkerId(workerId)) + .flatMap { result => + if (result.worker.isSuccess) { + Future.successful(result.worker.get) + } else { + Future.failed(result.worker.failed.get) + } } - } worker.flatMap(askActor[T](_, msg)) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/AkkaApp.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/AkkaApp.scala b/core/src/main/scala/io/gearpump/util/AkkaApp.scala index aee02d5..2b0bf61 100644 --- a/core/src/main/scala/io/gearpump/util/AkkaApp.scala +++ b/core/src/main/scala/io/gearpump/util/AkkaApp.scala @@ -1,9 +1,26 @@ -package io.gearpump.util +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import io.gearpump.cluster.ClusterConfig +package io.gearpump.util import scala.util.Try +import io.gearpump.cluster.ClusterConfig /** * A Main class helper to load Akka configuration automatically. @@ -14,7 +31,7 @@ trait AkkaApp { def main(akkaConf: Config, args: Array[String]): Unit - def help: Unit + def help(): Unit protected def akkaConfig: Config = { ClusterConfig.default() @@ -23,6 +40,6 @@ trait AkkaApp { def main(args: Array[String]): Unit = { Try { main(akkaConfig, args) - }.failed.foreach{ex => help; throw ex} + }.failed.foreach { ex => help(); throw ex } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/Constants.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/Constants.scala b/core/src/main/scala/io/gearpump/util/Constants.scala index a4fb545..65b7023 100644 --- a/core/src/main/scala/io/gearpump/util/Constants.scala +++ b/core/src/main/scala/io/gearpump/util/Constants.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -44,7 +44,8 @@ object Constants { val GEARPUMP_TASK_DISPATCHER = "gearpump.task-dispatcher" val GEARPUMP_CLUSTER_MASTERS = "gearpump.cluster.masters" val GEARPUMP_MASTERCLIENT_TIMEOUT = "gearpump.masterclient.timeout" - val GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS = "gearpump.worker.executor-share-same-jvm-as-worker" + val GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS = + "gearpump.worker.executor-share-same-jvm-as-worker" val GEARPUMP_HOME = "gearpump.home" val GEARPUMP_FULL_SCALA_VERSION = "gearpump.binary-version-with-scala-version" @@ -57,23 +58,24 @@ object Constants { val GEARPUMP_LOG_APPLICATION_DIR = "gearpump.log.application.dir" val HADOOP_CONF = "hadoopConf" - // Id used to identity Master JVM process in low level resource manager like YARN. // In YARN, it means the container Id. - val GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID = "gearpump.master-resource-manager-container-id" + val GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID = + "gearpump.master-resource-manager-container-id" // Id used to identity Worker JVM process in low level resource manager like YARN. // In YARN, it means the container Id. - val GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID = "gearpump.worker-resource-manager-container-id" + val GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID = + "gearpump.worker-resource-manager-container-id" // true or false val GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM = "gearpump.remote-debug-executor-jvm" val GEARPUMP_REMOTE_DEBUG_PORT = "gearpump.remote-debug-port" - // whether turn on GC log, true or false + // Whether to turn on GC log, true or false val GEARPUMP_VERBOSE_GC = "gearpump.verbose-gc" - // the time out for Future, like ask. + // The time out for Future, like ask. // !Important! This global timeout setting will also impact the UI // responsive time if set to too big. Please make sure you have // enough justification to change this global setting, otherwise @@ -101,15 +103,14 @@ object Constants { val GEARPUMP_APP_JAR = "gearpump.app.jar" val GEARPUMP_APP_NAME_PREFIX = "gearpump.app.name.prefix" - // where the jar is stored at. It can be a HDFS, or a local disk. + // Where the jar is stored at. It can be a HDFS, or a local disk. val GEARPUMP_APP_JAR_STORE_ROOT_PATH = "gearpump.jarstore.rootpath" - // Use java property -Dgearpump.config.file=xxx.conf to set customized configuration + // Uses java property -Dgearpump.config.file=xxx.conf to set customized configuration // Otherwise application.conf in classpath will be loaded val GEARPUMP_CUSTOM_CONFIG_FILE = "gearpump.config.file" - - //Metrics related + // Metrics related val GEARPUMP_METRIC_ENABLED = "gearpump.metrics.enabled" val GEARPUMP_METRIC_SAMPLE_RATE = "gearpump.metrics.sample-rate" val GEARPUMP_METRIC_REPORT_INTERVAL = "gearpump.metrics.report-interval-ms" @@ -117,13 +118,13 @@ object Constants { val GEARPUMP_METRIC_GRAPHITE_PORT = "gearpump.metrics.graphite.port" val GEARPUMP_METRIC_REPORTER = "gearpump.metrics.reporter" - // we will retain at max @RETAIN_HISTORY_HOURS history data + // Retains at max @RETAIN_HISTORY_HOURS history data val GEARPUMP_METRIC_RETAIN_HISTORY_DATA_HOURS = "gearpump.metrics.retainHistoryData.hours" - // time interval between two history data points. + // Time interval between two history data points. val GEARPUMP_RETAIN_HISTORY_DATA_INTERVAL_MS = "gearpump.metrics.retainHistoryData.intervalMs" - // we will retain at max @RETAIN_LATEST_SECONDS recent data points + // Retains at max @RETAIN_LATEST_SECONDS recent data points val GEARPUMP_RETAIN_RECENT_DATA_SECONDS = "gearpump.metrics.retainRecentData.seconds" // time interval between two recent data points. @@ -133,13 +134,13 @@ object Constants { // and shutdown itself val GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT = "gearpump.resource-allocation-timeout-seconds" - //Service related + // Service related val GEARPUMP_SERVICE_HTTP = "gearpump.services.http" val GEARPUMP_SERVICE_HOST = "gearpump.services.host" val GEARPUMP_SERVICE_SUPERVISOR_PATH = "gearpump.services.supervisor-actor-path" val GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE = "gearpump.services.config-render-option-concise" - //The partitioners provided by Gearpump + // The partitioners provided by Gearpump val BUILTIN_PARTITIONERS = Array( classOf[BroadcastPartitioner], classOf[CoLocationPartitioner], @@ -147,11 +148,10 @@ object Constants { classOf[ShuffleGroupingPartitioner], classOf[ShufflePartitioner]) - //Security related + // Security related val GEARPUMP_KEYTAB_FILE = "gearpump.keytab.file" val GEARPUMP_KERBEROS_PRINCIPAL = "gearpump.kerberos.principal" - val GEARPUMP_METRICS_MAX_LIMIT = "gearpump.metrics.akka.max-limit-on-query" val GEARPUMP_METRICS_AGGREGATORS = "gearpump.metrics.akka.metrics-aggregator-class" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/FileUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/FileUtils.scala b/core/src/main/scala/io/gearpump/util/FileUtils.scala index b0e8446..1561587 100644 --- a/core/src/main/scala/io/gearpump/util/FileUtils.scala +++ b/core/src/main/scala/io/gearpump/util/FileUtils.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,9 +18,10 @@ package io.gearpump.util +import java.io.{File, IOException} import java.nio.charset.Charset + import io.gearpump.google.common.io.Files -import java.io.{IOException, File} object FileUtils { private val UTF8 = Charset.forName("UTF-8") @@ -41,7 +42,7 @@ object FileUtils { Files.toByteArray(file) } - // recursively making all parent directories including itself + /** recursively making all parent directories including itself */ def forceMkdir(directory: File): Unit = { if (directory.exists() && directory.isFile) { throw new IOException(s"Failed to create directory ${directory.toString}, it already exist") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/Graph.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/Graph.scala b/core/src/main/scala/io/gearpump/util/Graph.scala index 6bff9da..8c34329 100644 --- a/core/src/main/scala/io/gearpump/util/Graph.scala +++ b/core/src/main/scala/io/gearpump/util/Graph.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,14 +17,14 @@ */ package io.gearpump.util +import scala.annotation.tailrec import scala.collection.mutable import scala.language.implicitConversions /** * Generic mutable Graph libraries. - * */ -class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serializable{ +class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serializable { private val _vertices = mutable.Set.empty[N] private val _edges = mutable.Set.empty[(N, E, N)] @@ -50,7 +50,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * Add a vertex * Current Graph is changed. */ - def addVertex(vertex : N): Unit = { + def addVertex(vertex: N): Unit = { val result = _vertices.add(vertex) if (result) { _indexs += vertex -> nextId @@ -73,14 +73,14 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * The result is stable */ def vertices: List[N] = { - // sort the vertex so that we can keep the order for mapVertex + // Sorts the vertex so that we can keep the order for mapVertex _vertices.toList.sortBy(_indexs(_)) } /** * out degree */ - def outDegreeOf(node : N): Int = { + def outDegreeOf(node: N): Int = { edges.count(_._1 == node) } @@ -94,7 +94,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial /** * out going edges. */ - def outgoingEdgesOf(node : N): List[(N, E, N)] = { + def outgoingEdgesOf(node: N): List[(N, E, N)] = { edges.filter(_._1 == node) } @@ -129,7 +129,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * add edge * Current Graph is changed. */ - def addEdge(node1 : N, edge: E, node2: N): Unit = { + def addEdge(node1: N, edge: E, node2: N): Unit = { addVertex(node1) addVertex(node2) addEdge((node1, edge, node2)) @@ -155,7 +155,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * Current graph is not changed. */ def mapEdge[NewEdge](fun: (N, E, N) => NewEdge): Graph[N, NewEdge] = { - val newEdges = edges.map {edge => + val newEdges = edges.map { edge => (edge._1, fun(edge._1, edge._2, edge._3), edge._3) } new Graph(vertices, newEdges) @@ -164,7 +164,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial /** * edges connected to node */ - def edgesOf(node : N): List[(N, E, N)] = { + def edgesOf(node: N): List[(N, E, N)] = { (incomingEdgesOf(node) ++ outgoingEdgesOf(node)).toSet[(N, E, N)].toList.sortBy(_indexs(_)) } @@ -179,9 +179,9 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * Add another graph * Current graph is changed. */ - def addGraph(other : Graph[N, E]) : Graph[N, E] = { + def addGraph(other: Graph[N, E]): Graph[N, E] = { (vertices ++ other.vertices).foreach(addVertex(_)) - (edges ++ other.edges).foreach(edge =>addEdge(edge._1, edge._2, edge._3)) + (edges ++ other.edges).foreach(edge => addEdge(edge._1, edge._2, edge._3)) this } @@ -247,7 +247,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial val newGraph = copy var output = List.empty[N] - while(!newGraph.isEmpty) { + while (!newGraph.isEmpty) { output ++= newGraph.removeZeroInDegree } output.iterator @@ -255,6 +255,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial /** * Return all circles in graph. + * * The reference of this algorithm is: * https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm */ @@ -278,11 +279,13 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial edge => { if (!indexMap.contains(edge._3)) { tarjan(edge._3) - if (lowLink.get(edge._3).get < lowLink.get(node).get) + if (lowLink.get(edge._3).get < lowLink.get(node).get) { lowLink(node) = lowLink(edge._3) + } } else { - if (inStack.get(edge._3).get && (indexMap.get(edge._3).get < lowLink.get(node).get)) + if (inStack.get(edge._3).get && (indexMap.get(edge._3).get < lowLink.get(node).get)) { lowLink(node) = indexMap(edge._3) + } } } } @@ -311,6 +314,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial /** * Return an iterator of vertex in topological order of graph with circles * The node returned by Iterator is stable sorted. + * * The reference of this algorithm is: * http://www.drdobbs.com/database/topological-sorting/184410262 */ @@ -345,10 +349,11 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * check whether there is a loop */ def hasCycle(): Boolean = { - @annotation.tailrec def detectCycle(graph: Graph[N, E]): Boolean = { - if(graph.edges.isEmpty) { + @tailrec + def detectCycle(graph: Graph[N, E]): Boolean = { + if (graph.edges.isEmpty) { false - } else if(graph.vertices.nonEmpty && !graph.vertices.exists(graph.inDegreeOf(_) == 0)) { + } else if (graph.vertices.nonEmpty && !graph.vertices.exists(graph.inDegreeOf(_) == 0)) { true } else { graph.removeZeroInDegree @@ -367,23 +372,25 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial } /** - * Generate a level map for each vertex - * withholding: if vertex A -> B, then level(A) < level(B) + * Generate a level map for each vertex withholding: + * {{{ + * if vertex A -> B, then level(A) -> level(B) + * }}} */ def vertexHierarchyLevelMap(): Map[N, Int] = { val newGraph = copy var output = Map.empty[N, Int] var level = 0 - while(!newGraph.isEmpty) { + while (!newGraph.isEmpty) { output ++= newGraph.removeZeroInDegree.map((_, level)).toMap level += 1 } output } - override def toString = { + override def toString: String = { Map("vertices" -> vertices.mkString(","), - "edges" -> edges.mkString(",")).toString() + "edges" -> edges.mkString(",")).toString() } } @@ -391,6 +398,8 @@ object Graph { /** * Example: + * + * {{{ * Graph(1 ~ 2 ~> 4 ~ 5 ~> 7, 8~9~>55, 11) * Will create a graph with: * nodes: @@ -399,17 +408,17 @@ object Graph { * 2: (1->4) * 5: (4->7) * 9: (8->55) - * + * }}} */ def apply[N, E](elems: Path[_ <: N, _ <: E]*): Graph[N, E] = { val graph = empty[N, E] - elems.foreach{ path => + elems.foreach { path => path.updategraph(graph) } graph } - def apply[N , E](vertices: List[N], edges: List[(N, E, N)]): Graph[N, E] = { + def apply[N, E](vertices: List[N], edges: List[(N, E, N)]): Graph[N, E] = { new Graph(vertices, edges) } @@ -417,11 +426,11 @@ object Graph { Some((graph.vertices, graph.edges)) } - def empty[N, E] = { + def empty[N, E]: Graph[N, E] = { new Graph(List.empty[N], List.empty[(N, E, N)]) } - class Path[N, +E](path: List[Either[N, E]]) { + class Path[N, + E](path: List[Either[N, E]]) { def ~[Edge >: E](edge: Edge): Path[N, Edge] = { new Path(path :+ Right(edge)) @@ -458,7 +467,7 @@ object Graph { } implicit class Node[N, E](self: N) extends Path[N, E](List(Left(self))) { - + override def ~[Edge](edge: Edge): Path[N, Edge] = { new Path(List(Left(self), Right(edge))) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala b/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala index 20a3fe6..7552444 100644 --- a/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala +++ b/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,19 +19,19 @@ package io.gearpump.util import java.util +import scala.collection.mutable.ListBuffer import akka.actor.Actor import com.typesafe.config.Config +import org.slf4j.Logger + import io.gearpump.TimeStamp -import io.gearpump.cluster.ClientToMaster.{ReadOption, QueryHistoryMetrics} +import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, ReadOption} import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem} import io.gearpump.metrics.Metrics._ import io.gearpump.metrics.MetricsAggregator import io.gearpump.util.Constants._ -import io.gearpump.util.HistoryMetricsService.{HistoryMetricsStore, SkipAllAggregator, DummyMetricsAggregator, MetricsStore, HistoryMetricsConfig} -import org.slf4j.Logger - -import scala.collection.mutable.ListBuffer +import io.gearpump.util.HistoryMetricsService.{DummyMetricsAggregator, HistoryMetricsConfig, HistoryMetricsStore, SkipAllAggregator} /** * @@ -44,7 +44,6 @@ import scala.collection.mutable.ListBuffer * for each hour. * * For fine-grained data in last 5 min, there will be 1 sample point per 15 seconds. - * */ class HistoryMetricsService(name: String, config: HistoryMetricsConfig) extends Actor { private val LOG: Logger = LogUtil.getLogger(getClass, name = name) @@ -75,14 +74,15 @@ class HistoryMetricsService(name: String, config: HistoryMetricsConfig) extends } + ".*$" } - private def fetchMetricsHistory(pathPattern: String, readOption: ReadOption.ReadOption): List[HistoryMetricsItem] = { + private def fetchMetricsHistory(pathPattern: String, readOption: ReadOption.ReadOption) + : List[HistoryMetricsItem] = { val result = new ListBuffer[HistoryMetricsItem] val regex = toRegularExpression(pathPattern).r.pattern val iter = metricsStore.iterator - while(iter.hasNext) { + while (iter.hasNext) { val (name, store) = iter.next() val matcher = regex.matcher(name) @@ -95,7 +95,7 @@ class HistoryMetricsService(name: String, config: HistoryMetricsConfig) extends case ReadOption.ReadHistory => result.append(store.readHistory: _*) case _ => - //skip all other options. + // Skip all other options. } } } @@ -106,43 +106,42 @@ class HistoryMetricsService(name: String, config: HistoryMetricsConfig) extends private var aggregators: Map[String, MetricsAggregator] = Map.empty[String, MetricsAggregator] import scala.collection.JavaConverters._ - val validAggregators = { - systemConfig.getConfig(Constants.GEARPUMP_METRICS_AGGREGATORS).root.unwrapped.keySet().asScala.toSet + private val validAggregators: Set[String] = { + val rootConfig = systemConfig.getConfig(Constants.GEARPUMP_METRICS_AGGREGATORS).root.unwrapped + rootConfig.keySet().asScala.toSet } def commandHandler: Receive = { - //path accept syntax ? *, ? will match one char, * will match at least one char + // Path accept syntax ? *, ? will match one char, * will match at least one char case QueryHistoryMetrics(inputPath, readOption, aggregatorClazz, options) => - val aggregator = { - if (aggregatorClazz == null || aggregatorClazz.isEmpty) { - dummyAggregator - } else if (aggregators.contains(aggregatorClazz)) { - aggregators(aggregatorClazz) - } else if (validAggregators.contains(aggregatorClazz)) { - val clazz = Class.forName(aggregatorClazz) - val constructor = clazz.getConstructor(classOf[Config]) - val aggregator = constructor.newInstance(systemConfig).asInstanceOf[MetricsAggregator] - aggregators += aggregatorClazz -> aggregator - aggregator - } else { - LOG.error(s"Aggregator $aggregatorClazz is not in the white list ${validAggregators}, we will drop all messages. " + - s"Please see config at ${GEARPUMP_METRICS_AGGREGATORS}") - val skipAll = new SkipAllAggregator - aggregators += aggregatorClazz -> new SkipAllAggregator - skipAll - } - } - - import collection.JavaConversions._ - val metrics = fetchMetricsHistory(inputPath, readOption).iterator + val aggregator = { + if (aggregatorClazz == null || aggregatorClazz.isEmpty) { + dummyAggregator + } else if (aggregators.contains(aggregatorClazz)) { + aggregators(aggregatorClazz) + } else if (validAggregators.contains(aggregatorClazz)) { + val clazz = Class.forName(aggregatorClazz) + val constructor = clazz.getConstructor(classOf[Config]) + val aggregator = constructor.newInstance(systemConfig).asInstanceOf[MetricsAggregator] + aggregators += aggregatorClazz -> aggregator + aggregator + } else { + LOG.error(s"Aggregator $aggregatorClazz is not in the white list ${validAggregators}, " + + s"we will drop all messages. Please see config at ${GEARPUMP_METRICS_AGGREGATORS}") + val skipAll = new SkipAllAggregator + aggregators += aggregatorClazz -> new SkipAllAggregator + skipAll + } + } + + val metrics = fetchMetricsHistory(inputPath, readOption).iterator sender ! HistoryMetrics(inputPath, aggregator.aggregate(options, metrics)) - } + } } object HistoryMetricsService { - trait MetricsStore { def add(inputMetrics: MetricType): Unit @@ -169,7 +168,7 @@ object HistoryMetricsService { def readHistory: List[HistoryMetricsItem] } - class DummyHistoryMetricsStore extends HistoryMetricsStore{ + class DummyHistoryMetricsStore extends HistoryMetricsStore { val empty = List.empty[HistoryMetricsItem] @@ -187,7 +186,8 @@ object HistoryMetricsService { } object HistoryMetricsStore { - def apply(name: String, metric: MetricType, config: HistoryMetricsConfig): HistoryMetricsStore = { + def apply(name: String, metric: MetricType, config: HistoryMetricsConfig) + : HistoryMetricsStore = { metric match { case histogram: Histogram => new HistogramMetricsStore(config) case meter: Meter => new MeterMetricsStore(config) @@ -199,18 +199,18 @@ object HistoryMetricsService { } /** - ** Metrics store to store history data points + * Metrics store to store history data points * For each time point, we will store single data point. * * @param retainCount how many data points to retain, old data will be removed * @param retainIntervalMs time interval between two data points. */ - class SingleValueMetricsStore (retainCount: Int, retainIntervalMs: Long) extends MetricsStore{ + class SingleValueMetricsStore(retainCount: Int, retainIntervalMs: Long) extends MetricsStore { - private val queue = new util.ArrayDeque[HistoryMetricsItem]() + private val queue = new util.ArrayDeque[HistoryMetricsItem]() private var latest = List.empty[HistoryMetricsItem] - // end of the time window we are tracking + // End of the time window we are tracking private var endTime = 0L override def add(inputMetrics: MetricType): Unit = { @@ -226,7 +226,7 @@ object HistoryMetricsService { queue.addFirst(metrics) endTime = (now / retainIntervalMs + 1) * retainIntervalMs - // remove old data + // Removes old data if (queue.size() > retainCount) { queue.removeLast() } @@ -235,8 +235,8 @@ object HistoryMetricsService { def read: List[HistoryMetricsItem] = { val result = new ListBuffer[HistoryMetricsItem] - import scala.collection.JavaConversions.asScalaIterator - queue.iterator().foreach(result.prepend(_)) + import scala.collection.JavaConverters._ + queue.iterator().asScala.foreach(result.prepend(_)) result.toList } @@ -246,11 +246,14 @@ object HistoryMetricsService { } /** + * Config for how long to keep history metrics data. * * @param retainHistoryDataHours Retain at max @RETAIN_HISTORY_HOURS history data(unit hour) * @param retainHistoryDataIntervalMs time interval between two history data points.(unit: ms) - * @param retainRecentDataSeconds Retain at max @RETAIN_LATEST_SECONDS recent data points(unit: seconds) - * @param retainRecentDataIntervalMs Retain at max @RETAIN_LATEST_SECONDS recent data points(unit: ms) + * @param retainRecentDataSeconds Retain at max @RETAIN_LATEST_SECONDS + * recent data points(unit: seconds) + * @param retainRecentDataIntervalMs Retain at max @RETAIN_LATEST_SECONDS recent + * data points(unit: ms) */ case class HistoryMetricsConfig( retainHistoryDataHours: Int, @@ -385,14 +388,17 @@ object HistoryMetricsService { } class DummyMetricsAggregator extends MetricsAggregator { - def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]): List[HistoryMetricsItem] = { - import scala.collection.JavaConverters._ + def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]) + : List[HistoryMetricsItem] = { inputs.toList } } class SkipAllAggregator extends MetricsAggregator { private val empty = List.empty[HistoryMetricsItem] - def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]): List[HistoryMetricsItem] = empty + def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]) + : List[HistoryMetricsItem] = { + empty + } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/LogUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/LogUtil.scala b/core/src/main/scala/io/gearpump/util/LogUtil.scala index 5fa3136..1669129 100644 --- a/core/src/main/scala/io/gearpump/util/LogUtil.scala +++ b/core/src/main/scala/io/gearpump/util/LogUtil.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,27 +16,26 @@ * limitations under the License. */ - package io.gearpump.util import java.io.File import java.net.InetAddress import java.util.Properties +import scala.util.Try import com.typesafe.config.Config import org.apache.log4j.PropertyConfigurator import org.slf4j.{Logger, LoggerFactory} -import scala.util.Try - object LogUtil { object ProcessType extends Enumeration { type ProcessType = Value val MASTER, WORKER, LOCAL, APPLICATION, UI = Value } - def getLogger[T](clazz : Class[T], context : String = null, master : Any = null, worker : Any = null, executor : Any = null, task : Any = null, app : Any = null, name: String = null) : Logger = { - + def getLogger[T]( + clazz: Class[T], context: String = null, master: Any = null, worker: Any = null, + executor: Any = null, task: Any = null, app: Any = null, name: String = null): Logger = { var env = "" if (null != context) { @@ -70,8 +69,9 @@ object LogUtil { } } - def loadConfiguration(config : Config, processType : ProcessType.ProcessType) : Unit = { - //set log file name + /** Custom the log file locations by reading config from system properties */ + def loadConfiguration(config: Config, processType: ProcessType.ProcessType): Unit = { + // Set log file name val propName = s"gearpump.${processType.toString.toLowerCase}.log.file" val props = loadConfiguration @@ -82,7 +82,8 @@ object LogUtil { processType match { case ProcessType.APPLICATION => props.setProperty("log4j.rootAppender", "${gearpump.application.logger}") - props.setProperty("gearpump.application.log.rootdir", applicationLogDir(config).getAbsolutePath) + props.setProperty("gearpump.application.log.rootdir", + applicationLogDir(config).getAbsolutePath) case _ => props.setProperty("log4j.rootAppender", "${gearpump.root.logger}") props.setProperty("gearpump.log.dir", daemonLogDir(config).getAbsolutePath) @@ -96,7 +97,7 @@ object LogUtil { new File(dir) } - def verboseLogToConsole: Unit = { + def verboseLogToConsole(): Unit = { val props = loadConfiguration props.setProperty("log4j.rootLogger", "DEBUG,console") PropertyConfigurator.configure(props) @@ -105,14 +106,14 @@ object LogUtil { def loadConfiguration: Properties = { val props = new Properties() val log4jConfStream = getClass().getClassLoader.getResourceAsStream("log4j.properties") - if(log4jConfStream!=null) { + if (log4jConfStream != null) { props.load(log4jConfStream) } log4jConfStream.close() props } - private def jvmName : String = { + private def jvmName: String = { val hostname = Try(InetAddress.getLocalHost.getHostName).getOrElse("local") java.lang.management.ManagementFactory.getRuntimeMXBean().getName() } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala b/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala index 13f9ea8..0b843f3 100644 --- a/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala +++ b/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,11 +19,11 @@ package io.gearpump.util import java.io.{Closeable, Flushable} +import scala.sys.process.ProcessLogger import org.slf4j.LoggerFactory -import scala.sys.process.ProcessLogger - +/** Redirect the console output to parent process */ class ProcessLogRedirector extends ProcessLogger with Closeable with Flushable with ConsoleOutput { private val LOG = LoggerFactory.getLogger("redirect") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala b/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala index 1142bdc..f6c7a2b 100644 --- a/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala +++ b/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,13 +18,16 @@ package io.gearpump.util - /** * Check equal using reference-equal. */ trait ReferenceEqual extends AnyRef { - override def equals(other : Any) : Boolean = { + override def equals(other: Any): Boolean = { this.eq(other.asInstanceOf[AnyRef]) } + + override def hashCode(): Int = { + super.hashCode() + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/RestartPolicy.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/RestartPolicy.scala b/core/src/main/scala/io/gearpump/util/RestartPolicy.scala index 787b508..245cb1b 100644 --- a/core/src/main/scala/io/gearpump/util/RestartPolicy.scala +++ b/core/src/main/scala/io/gearpump/util/RestartPolicy.scala @@ -15,18 +15,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.util -import akka.actor.{ChildRestartStats, ActorRef} +package io.gearpump.util import scala.concurrent.duration.Duration +import akka.actor.ChildRestartStats + /** - * @param maxNrOfRetries the number of times is allowed to be restarted, negative value means no limit, - * if the limit is exceeded the policy will not allow to restart - * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window + * When one executor or task fails, Gearpump will try to start. However, if it fails after + * multiple retries, then we abort. + * + * @param maxNrOfRetries The number of times is allowed to be restarted, negative value means no + * limit, if the limit is exceeded the policy will not allow to restart + * @param withinTimeRange Duration of the time window for maxNrOfRetries. + * Duration.Inf means no window */ -class RestartPolicy (maxNrOfRetries: Int, withinTimeRange: Duration) { +class RestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) { private val status = new ChildRestartStats(null, 0, 0L) private val retriesWindow = (Some(maxNrOfRetries), Some(withinTimeRange.toMillis.toInt)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/RichProcess.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/RichProcess.scala b/core/src/main/scala/io/gearpump/util/RichProcess.scala index bfe0607..ab5611f 100644 --- a/core/src/main/scala/io/gearpump/util/RichProcess.scala +++ b/core/src/main/scala/io/gearpump/util/RichProcess.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -25,7 +25,9 @@ trait ConsoleOutput { def error: String } -class RichProcess(process: Process, val logger: ConsoleOutput) extends Process { - def exitValue() : scala.Int = process.exitValue() - def destroy() : scala.Unit = process.destroy() +/** Extends Process by providing a additional logger: ConsoleOutput interface. */ +class RichProcess(process: Process, _logger: ConsoleOutput) extends Process { + def exitValue(): scala.Int = process.exitValue() + def destroy(): scala.Unit = process.destroy() + def logger: ConsoleOutput = _logger } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala b/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala index 1464b21..64b920c 100644 --- a/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala +++ b/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,20 +15,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.gearpump.util import java.util.concurrent.TimeUnit +import scala.concurrent.duration._ import akka.actor.{Actor, ActorRef} import akka.pattern.ask -import scala.concurrent.duration._ - +/** A helper util to send a message to remote actor and notify callback when timeout */ trait TimeOutScheduler { this: Actor => import context.dispatcher - def sendMsgWithTimeOutCallBack(target: ActorRef, msg: AnyRef, milliSeconds: Long, timeOutHandler: => Unit): Unit = { + def sendMsgWithTimeOutCallBack( + target: ActorRef, msg: AnyRef, milliSeconds: Long, timeOutHandler: => Unit): Unit = { val result = target.ask(msg)(FiniteDuration(milliSeconds, TimeUnit.MILLISECONDS)) result onSuccess { case msg => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/Util.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/Util.scala b/core/src/main/scala/io/gearpump/util/Util.scala index 96eddfd..8ed9bb3 100644 --- a/core/src/main/scala/io/gearpump/util/Util.scala +++ b/core/src/main/scala/io/gearpump/util/Util.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,18 +18,18 @@ package io.gearpump.util -import java.io.{DataInputStream, FileInputStream, File} -import java.net.{URL, URI, ServerSocket} -import java.util.jar.Manifest -import com.typesafe.config.{ConfigFactory, Config} -import io.gearpump.cluster.AppJar -import io.gearpump.jarstore.{FilePath, JarStoreService} -import io.gearpump.transport.HostPort - +import java.io.{BufferedReader, File, FileInputStream, InputStreamReader} +import java.net.{ServerSocket, URI} import scala.concurrent.forkjoin.ThreadLocalRandom -import scala.sys.process.{ProcessLogger, Process} +import scala.sys.process.Process import scala.util.{Failure, Success, Try} +import com.typesafe.config.{Config, ConfigFactory} + +import io.gearpump.cluster.AppJar +import io.gearpump.jarstore.JarStoreService +import io.gearpump.transport.HostPort + object Util { val LOG = LogUtil.getLogger(getClass) private val defaultUri = new URI("file:///") @@ -39,7 +39,7 @@ object Util { appNamePattern.matcher(appName).matches() } - def getCurrentClassPath : Array[String] = { + def getCurrentClassPath: Array[String] = { val classpath = System.getProperty("java.class.path") val classpathList = classpath.split(File.pathSeparator) classpathList @@ -48,8 +48,9 @@ object Util { def version: String = { val home = System.getProperty(Constants.GEARPUMP_HOME) val version = Try { - val versionFile = new DataInputStream(new FileInputStream(new File(home, "VERSION"))) - val version = versionFile.readLine().replace("version:=", "") + val versionFile = new FileInputStream(new File(home, "VERSION")) + val reader = new BufferedReader(new InputStreamReader(versionFile)) + val version = reader.readLine().replace("version:=", "") versionFile.close() version } @@ -62,11 +63,13 @@ object Util { } } - def startProcess(options : Array[String], classPath : Array[String], mainClass : String, - arguments : Array[String]) : RichProcess = { + def startProcess(options: Array[String], classPath: Array[String], mainClass: String, + arguments: Array[String]): RichProcess = { val java = System.getProperty("java.home") + "/bin/java" - val command = List(java) ++ options ++ List("-cp", classPath.mkString(File.pathSeparator), mainClass) ++ arguments - LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")} \n ${options.mkString(" ")}") + val command = List(java) ++ options ++ + List("-cp", classPath.mkString(File.pathSeparator), mainClass) ++ arguments + LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")} " + + s"\n ${options.mkString(" ")}") val logger = new ProcessLogRedirector() val process = Process(command).run(logger) new RichProcess(process, logger) @@ -75,7 +78,7 @@ object Util { /** * hostList format: host1:port1,host2:port2,host3:port3... */ - def parseHostList(hostList : String) : List[HostPort] = { + def parseHostList(hostList: String): List[HostPort] = { val masters = hostList.trim.split(",").map { address => val hostAndPort = address.split(":") HostPort(hostAndPort(0), hostAndPort(1).toInt) @@ -85,7 +88,7 @@ object Util { def resolvePath(path: String): String = { val uri = new URI(path) - if(uri.getScheme == null && uri.getFragment == null) { + if (uri.getScheme == null && uri.getFragment == null) { val absolutePath = new File(path).getCanonicalPath.replaceAll("\\\\", "/") "file://" + absolutePath } else { @@ -106,16 +109,16 @@ object Util { } } - def randInt: Int = { + def randInt(): Int = { Math.abs(ThreadLocalRandom.current.nextInt()) } - def findFreePort: Try[Int] = { + def findFreePort(): Try[Int] = { Try { - val socket = new ServerSocket(0); - socket.setReuseAddress(true); - val port = socket.getLocalPort(); - socket.close; + val socket = new ServerSocket(0) + socket.setReuseAddress(true) + val port = socket.getLocalPort() + socket.close port } } @@ -132,7 +135,6 @@ object Util { * Then you can use like this: * * filterOutOrigin(config, "reference.conf") - * */ import scala.collection.JavaConverters._ def filterOutOrigin(config: Config, originFile: String): Config = { @@ -148,16 +150,19 @@ object Util { } } - case class JvmSetting(vmargs : Array[String], classPath : Array[String]) + case class JvmSetting(vmargs: Array[String], classPath: Array[String]) - case class AppJvmSettings(appMater : JvmSetting, executor : JvmSetting) + case class AppJvmSettings(appMater: JvmSetting, executor: JvmSetting) - def resolveJvmSetting(conf : Config) : AppJvmSettings = { + /** Get an effective AppJvmSettings from Config */ + def resolveJvmSetting(conf: Config): AppJvmSettings = { - import Constants._ + import io.gearpump.util.Constants._ - val appMasterVMArgs = Try(conf.getString(GEARPUMP_APPMASTER_ARGS).split("\\s+").filter(_.nonEmpty)).toOption - val executorVMArgs = Try(conf.getString(GEARPUMP_EXECUTOR_ARGS).split("\\s+").filter(_.nonEmpty)).toOption + val appMasterVMArgs = Try(conf.getString(GEARPUMP_APPMASTER_ARGS).split("\\s+") + .filter(_.nonEmpty)).toOption + val executorVMArgs = Try(conf.getString(GEARPUMP_EXECUTOR_ARGS).split("\\s+") + .filter(_.nonEmpty)).toOption val appMasterClassPath = Try( conf.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH) @@ -169,7 +174,7 @@ object Util { AppJvmSettings( JvmSetting(appMasterVMArgs.getOrElse(Array.empty[String]), - appMasterClassPath.getOrElse(Array.empty[String]) ), + appMasterClassPath.getOrElse(Array.empty[String])), JvmSetting(executorVMArgs .getOrElse(Array.empty[String]), executorClassPath.getOrElse(Array.empty[String]))) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 870a69f..0faadd9 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -7,7 +7,7 @@ # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, @@ -50,7 +50,6 @@ log4j.rootLogger=INFO,console # Logging Threshold log4j.threshhold=ALL - # ===================================================================== # Appenders # ===================================================================== @@ -115,4 +114,3 @@ log4j.appender.ALA.layout=org.apache.log4j.PatternLayout #log4j.appender.ALA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n log4j.appender.ALA.layout.ConversionPattern=[%p] [%d{MM/dd/yyyy HH:mm:ss.SSS}] [%c{1}] %m%n - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/resources/test.conf ---------------------------------------------------------------------- diff --git a/core/src/test/resources/test.conf b/core/src/test/resources/test.conf index 324e8bd..ac18c88 100644 --- a/core/src/test/resources/test.conf +++ b/core/src/test/resources/test.conf @@ -97,7 +97,6 @@ gearpump-ui { } } - akka { logger-startup-timeout = 30s @@ -111,7 +110,6 @@ akka { mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" } default-dispatcher { - mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" throughput = 10 fork-join-executor { parallelism-factor = 2 @@ -145,7 +143,6 @@ akka { default-remote-dispatcher { throughput = 5 type = Dispatcher - mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" executor = "fork-join-executor" fork-join-executor { parallelism-min = 1 http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/TestProbeUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/TestProbeUtil.scala b/core/src/test/scala/io/gearpump/TestProbeUtil.scala index 9b808dd..e7181db 100644 --- a/core/src/test/scala/io/gearpump/TestProbeUtil.scala +++ b/core/src/test/scala/io/gearpump/TestProbeUtil.scala @@ -18,11 +18,13 @@ package io.gearpump -import akka.actor.{Terminated, Actor, Props} +import scala.language.implicitConversions + +import akka.actor.{Actor, Props, Terminated} import akka.testkit.TestProbe object TestProbeUtil { - implicit def toProps(probe: TestProbe) = { + implicit def toProps(probe: TestProbe): Props = { Props(new Actor { val probeRef = probe.ref context.watch(probeRef) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/MasterHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/MasterHarness.scala b/core/src/test/scala/io/gearpump/cluster/MasterHarness.scala index 21d145a..b1ca357 100644 --- a/core/src/test/scala/io/gearpump/cluster/MasterHarness.scala +++ b/core/src/test/scala/io/gearpump/cluster/MasterHarness.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,24 +19,20 @@ package io.gearpump.cluster import java.io.File -import io.gearpump.util.{Constants, ActorUtil} -import io.gearpump.util.{FileUtils} -import java.net.{UnknownHostException, SocketTimeoutException, Socket, InetSocketAddress, ServerSocket, URLClassLoader} +import java.net.{InetSocketAddress, Socket, SocketTimeoutException, URLClassLoader, UnknownHostException} import java.util.Properties import java.util.concurrent.{Executors, TimeUnit} +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext} import akka.actor.{Actor, ActorSystem, Address, Props} import akka.testkit.TestProbe import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigValueFactory} + import io.gearpump.cluster.MasterHarness.MockMaster import io.gearpump.util.Constants._ -import io.gearpump.util.{Constants, LogUtil, ActorUtil, Util} - -import scala.collection.JavaConverters._ -import scala.concurrent.ExecutionContext -import scala.concurrent.duration.Duration -import scala.sys.process.Process -import scala.util.Try +import io.gearpump.util.{ActorUtil, FileUtils, LogUtil} trait MasterHarness { private val LOG = LogUtil.getLogger(getClass) @@ -54,7 +50,7 @@ trait MasterHarness { def getHost: String = host def getPort: Int = port - def config : Config + protected def config: Config def startActorSystem(): Unit = { val systemConfig = config @@ -69,13 +65,13 @@ trait MasterHarness { LOG.info(s"Actor system is started, $host, $port") } - def shutdownActorSystem():Unit = { - system.shutdown() - system.awaitTermination + def shutdownActorSystem(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) LOG.info(s"Actor system is stopped, $host, $port") } - def convertTestConf(host : String, port : Int) : File = { + def convertTestConf(host: String, port: Int): File = { val test = ConfigFactory.parseResourcesAnySyntax("test.conf", ConfigParseOptions.defaults.setAllowMissing(true)) @@ -88,13 +84,13 @@ trait MasterHarness { confFile } - def createMockMaster() : TestProbe = { + def createMockMaster(): TestProbe = { val masterReceiver = TestProbe()(system) val master = system.actorOf(Props(classOf[MockMaster], masterReceiver), MASTER) masterReceiver } - def isPortUsed(host : String, port : Int) : Boolean = { + def isPortUsed(host: String, port: Int): Boolean = { var isPortUsed = true val socket = new Socket() @@ -103,12 +99,12 @@ trait MasterHarness { socket.connect(new InetSocketAddress(host, port), 1000) socket.isConnected } catch { - case ex: SocketTimeoutException => + case ex: SocketTimeoutException => isPortUsed = false case ex: UnknownHostException => isPortUsed = false case ex: Throwable => - // for other case, we think the port is listened + // For other case, we think the port has been occupied. isPortUsed = true } finally { socket.close() @@ -116,7 +112,7 @@ trait MasterHarness { isPortUsed } - def getContextClassPath : Array[String] = { + def getContextClassPath: Array[String] = { val contextLoader = Thread.currentThread().getContextClassLoader() val urlLoader = if (!contextLoader.isInstanceOf[URLClassLoader]) { @@ -135,19 +131,16 @@ trait MasterHarness { /** * Remove trailing $ */ - def getMainClassName(mainObj : Any) : String = { + def getMainClassName(mainObj: Any): String = { mainObj.getClass.getName.dropRight(1) } - import Constants._ - def getMasterListOption(): Array[String] = { masterProperties.asScala.toList.map { kv => s"-D${kv._1}=${kv._2}" }.toArray } - def masterConfig: Config = { ConfigFactory.parseProperties(masterProperties).withFallback(system.settings.config) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/TestUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/TestUtil.scala b/core/src/test/scala/io/gearpump/cluster/TestUtil.scala index e491fc7..dce4902 100644 --- a/core/src/test/scala/io/gearpump/cluster/TestUtil.scala +++ b/core/src/test/scala/io/gearpump/cluster/TestUtil.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.gearpump.cluster import akka.actor._ @@ -25,10 +26,11 @@ object TestUtil { val UI_CONFIG = ClusterConfig.ui("test.conf") class DummyAppMaster(context: AppMasterContext, app: AppDescription) extends ApplicationMaster { - context.masterProxy ! (context, app) + context.masterProxy !(context, app) - def receive : Receive = null + def receive: Receive = null } - val dummyApp : AppDescription = AppDescription("dummy", classOf[DummyAppMaster].getName, UserConfig.empty) + val dummyApp: AppDescription = + AppDescription("dummy", classOf[DummyAppMaster].getName, UserConfig.empty) } \ No newline at end of file
