http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/ActorSystemBooter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/ActorSystemBooter.scala b/core/src/main/scala/org/apache/gearpump/util/ActorSystemBooter.scala new file mode 100644 index 0000000..674445a --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/util/ActorSystemBooter.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import java.util.concurrent.{TimeUnit, TimeoutException} +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.util.{Failure, Success, Try} + +import akka.actor._ +import com.typesafe.config.Config +import org.slf4j.Logger + +import org.apache.gearpump.cluster.ClusterConfig +import org.apache.gearpump.util.LogUtil.ProcessType + +/** + * ActorSystemBooter start a new JVM process to boot an actor system. + * All executors are started by ActorSystemBooter + * + * It send the system address to "report back actor" + */ +class ActorSystemBooter(config: Config) { + import org.apache.gearpump.util.ActorSystemBooter._ + + def boot(name: String, reportBackActor: String): ActorSystem = { + val system = ActorSystem(name, config) + // Daemon path: http://{system}@{ip}:{port}/daemon + system.actorOf(Props(classOf[Daemon], name, reportBackActor), "daemon") + system + } +} + +object ActorSystemBooter { + + def apply(config: Config): ActorSystemBooter = new ActorSystemBooter(config) + + def main(args: Array[String]) { + val name = args(0) + val reportBack = args(1) + val config = ClusterConfig.default() + + LogUtil.loadConfiguration(config, ProcessType.APPLICATION) + + val debugPort = Option(System.getProperty(Constants.GEARPUMP_REMOTE_DEBUG_PORT)) + debugPort.foreach { port => + val LOG: Logger = LogUtil.getLogger(ActorSystemBooter.getClass) + LOG.info("==========================================") + LOG.info("Remote debug port: " + port) + LOG.info("==========================================") + } + + val system = apply(config).boot(name, reportBack) + + Runtime.getRuntime().addShutdownHook(new Thread() { + override def run(): Unit = { + val LOG: Logger = LogUtil.getLogger(ActorSystemBooter.getClass) + LOG.info("Maybe we have received a SIGINT signal from parent process, " + + "start to cleanup resources....") + system.terminate() + } + }) + + Await.result(system.whenTerminated, Duration.Inf) + } + + case class BindLifeCycle(actor: ActorRef) + case class CreateActor(prop: Props, name: String) + case class ActorCreated(actor: ActorRef, name: String) + case class CreateActorFailed(name: String, reason: Throwable) + + case class RegisterActorSystem(systemPath: String) + + /** + * This actor system will watch for parent, + * If parent dies, this will also die + */ + case class ActorSystemRegistered(bindLifeWith: ActorRef) + case class RegisterActorSystemFailed(reason: Throwable) + + object RegisterActorSystemTimeOut + + class Daemon(val name: String, reportBack: String) extends Actor { + val LOG: Logger = LogUtil.getLogger(getClass, context = name) + + val username = Option(System.getProperty(Constants.GEARPUMP_USERNAME)).getOrElse("not_defined") + LOG.info(s"RegisterActorSystem to ${reportBack}, current user: $username") + + val reportBackActor = context.actorSelection(reportBack) + reportBackActor ! RegisterActorSystem(ActorUtil.getSystemAddress(context.system).toString) + + implicit val executionContext = context.dispatcher + val timeout = context.system.scheduler.scheduleOnce(Duration(25, TimeUnit.SECONDS), + self, RegisterActorSystemFailed(new TimeoutException)) + + context.become(waitForRegisterResult) + + def receive: Receive = null + + def waitForRegisterResult: Receive = { + case ActorSystemRegistered(parent) => + timeout.cancel() + context.watch(parent) + context.become(waitCommand) + case RegisterActorSystemFailed(ex) => + LOG.error("RegisterActorSystemFailed", ex) + timeout.cancel() + context.stop(self) + } + + def waitCommand: Receive = { + case BindLifeCycle(actor) => + LOG.info(s"ActorSystem $name Binding life cycle with actor: $actor") + context.watch(actor) + case create@CreateActor(props: Props, name: String) => + LOG.info(s"creating actor $name") + val actor = Try(context.actorOf(props, name)) + actor match { + case Success(actor) => + sender ! ActorCreated(actor, name) + case Failure(e) => + sender ! CreateActorFailed(props.clazz.getName, e) + } + case PoisonPill => + context.stop(self) + case Terminated(actor) => + LOG.info(s"System $name Watched actor is terminated $actor") + context.stop(self) + } + + override def postStop(): Unit = { + LOG.info(s"ActorSystem $name is shutting down...") + context.system.terminate() + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala new file mode 100644 index 0000000..b10163f --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import org.apache.gearpump.cluster.worker.WorkerId + +import scala.concurrent.{ExecutionContext, Future} + +import akka.actor.Actor.Receive +import akka.actor._ +import akka.pattern.ask +import org.slf4j.Logger + +import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers +import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ResolveWorkerId} +import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList +import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ResolveWorkerIdResult} +import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, StartExecutorSystems} +import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest} +import org.apache.gearpump.transport.HostPort + +object ActorUtil { + private val LOG: Logger = LogUtil.getLogger(getClass) + + def getSystemAddress(system: ActorSystem): Address = { + system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + } + + def getFullPath(system: ActorSystem, path: ActorPath): String = { + path.toStringWithAddress(getSystemAddress(system)) + } + + def getHostname(actor: ActorRef): String = { + val path = actor.path + path.address.host.getOrElse("localhost") + } + + def defaultMsgHandler(actor: ActorRef): Receive = { + case msg: Any => + LOG.error(s"Cannot find a matching message, ${msg.getClass.toString}, forwarded from $actor") + } + + def printActorSystemTree(system: ActorSystem): Unit = { + val extendedSystem = system.asInstanceOf[ExtendedActorSystem] + val clazz = system.getClass + val m = clazz.getDeclaredMethod("printTree") + m.setAccessible(true) + LOG.info(m.invoke(system).asInstanceOf[String]) + } + + /** Checks whether a actor is child actor by simply examining name */ + // TODO: fix this, we should also check the path to root besides name + def isChildActorPath(parent: ActorRef, child: ActorRef): Boolean = { + if (null != child) { + parent.path.name == child.path.parent.name + } else { + false + } + } + + def actorNameForExecutor(appId: Int, executorId: Int): String = "app" + appId + "-executor" + + executorId + + // TODO: Currently we explicitly require the master contacts to be started with this path pattern + // akka.tcp://$MASTER@${master.host}:${master.port}/user/$MASTER + def getMasterActorPath(master: HostPort): ActorPath = { + import org.apache.gearpump.util.Constants.MASTER + ActorPath.fromString(s"akka.tcp://$MASTER@${master.host}:${master.port}/user/$MASTER") + } + + def launchExecutorOnEachWorker(master: ActorRef, executorJvmConfig: ExecutorSystemJvmConfig, + sender: ActorRef)(implicit executor: scala.concurrent.ExecutionContext): Unit = { + implicit val timeout = Constants.FUTURE_TIMEOUT + + (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]].map { list => + val resources = list.workers.map { + workerId => ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER) + }.toArray + + master.tell(StartExecutorSystems(resources, executorJvmConfig), sender) + } + } + + def askAppMaster[T](master: ActorRef, appId: Int, msg: Any)(implicit ex: ExecutionContext) + : Future[T] = { + implicit val timeout = Constants.FUTURE_TIMEOUT + val appmaster = askActor[ResolveAppIdResult](master, ResolveAppId(appId)).flatMap { result => + if (result.appMaster.isSuccess) { + Future.successful(result.appMaster.get) + } else { + Future.failed(result.appMaster.failed.get) + } + } + appmaster.flatMap(askActor[T](_, msg)) + } + + def askWorker[T](master: ActorRef, workerId: WorkerId, msg: Any)(implicit ex: ExecutionContext) + : Future[T] = { + implicit val timeout = Constants.FUTURE_TIMEOUT + val worker = askActor[ResolveWorkerIdResult](master, ResolveWorkerId(workerId)) + .flatMap { result => + if (result.worker.isSuccess) { + Future.successful(result.worker.get) + } else { + Future.failed(result.worker.failed.get) + } + } + worker.flatMap(askActor[T](_, msg)) + } + + def askActor[T](actor: ActorRef, msg: Any)(implicit ex: ExecutionContext): Future[T] = { + implicit val timeout = Constants.FUTURE_TIMEOUT + (actor ? msg).asInstanceOf[Future[T]] + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/AkkaApp.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/AkkaApp.scala b/core/src/main/scala/org/apache/gearpump/util/AkkaApp.scala new file mode 100644 index 0000000..6060368 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/util/AkkaApp.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import scala.util.Try + +import org.apache.gearpump.cluster.ClusterConfig + +/** + * A Main class helper to load Akka configuration automatically. + */ +trait AkkaApp { + + type Config = com.typesafe.config.Config + + def main(akkaConf: Config, args: Array[String]): Unit + + def help(): Unit + + protected def akkaConfig: Config = { + ClusterConfig.default() + } + + def main(args: Array[String]): Unit = { + Try { + main(akkaConfig, args) + }.failed.foreach { ex => help(); throw ex } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/Constants.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/Constants.scala b/core/src/main/scala/org/apache/gearpump/util/Constants.scala new file mode 100644 index 0000000..dba5a1f --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/util/Constants.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import java.util.concurrent.TimeUnit + +import org.apache.gearpump.partitioner._ + +object Constants { + val MASTER_WATCHER = "masterwatcher" + val SINGLETON_MANAGER = "singleton" + + val MASTER_CONFIG = "gearpump-master" + val WORKER_CONFIG = "gearpump-worker" + val UI_CONFIG = "gearpump-ui" + val LINUX_CONFIG = "gearpump-linux" // linux or Mac + + val MASTER = "master" + val WORKER = "worker" + + val GEARPUMP_WORKER_SLOTS = "gearpump.worker.slots" + val GEARPUMP_EXECUTOR_PROCESS_LAUNCHER = "gearpump.worker.executor-process-launcher" + val GEARPUMP_SCHEDULING_SCHEDULER = "gearpump.scheduling.scheduler-class" + val GEARPUMP_SCHEDULING_REQUEST = "gearpump.scheduling.requests" + val GEARPUMP_TRANSPORT_SERIALIZER = "gearpump.transport.serializer" + val GEARPUMP_SERIALIZER_POOL = "gearpump.serialization-framework" + val GEARPUMP_SERIALIZERS = "gearpump.serializers" + val GEARPUMP_TASK_DISPATCHER = "gearpump.task-dispatcher" + val GEARPUMP_CLUSTER_MASTERS = "gearpump.cluster.masters" + val GEARPUMP_MASTERCLIENT_TIMEOUT = "gearpump.masterclient.timeout" + val GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS = + "gearpump.worker.executor-share-same-jvm-as-worker" + + val GEARPUMP_HOME = "gearpump.home" + val GEARPUMP_FULL_SCALA_VERSION = "gearpump.binary-version-with-scala-version" + val GEARPUMP_HOSTNAME = "gearpump.hostname" + val GEARPUMP_APPMASTER_ARGS = "gearpump.appmaster.vmargs" + val GEARPUMP_APPMASTER_EXTRA_CLASSPATH = "gearpump.appmaster.extraClasspath" + val GEARPUMP_EXECUTOR_ARGS = "gearpump.executor.vmargs" + val GEARPUMP_EXECUTOR_EXTRA_CLASSPATH = "gearpump.executor.extraClasspath" + val GEARPUMP_LOG_DAEMON_DIR = "gearpump.log.daemon.dir" + val GEARPUMP_LOG_APPLICATION_DIR = "gearpump.log.application.dir" + val HADOOP_CONF = "hadoopConf" + + // Id used to identity Master JVM process in low level resource manager like YARN. + // In YARN, it means the container Id. + val GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID = + "gearpump.master-resource-manager-container-id" + + // Id used to identity Worker JVM process in low level resource manager like YARN. + // In YARN, it means the container Id. + val GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID = + "gearpump.worker-resource-manager-container-id" + + // true or false + val GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM = "gearpump.remote-debug-executor-jvm" + val GEARPUMP_REMOTE_DEBUG_PORT = "gearpump.remote-debug-port" + + // Whether to turn on GC log, true or false + val GEARPUMP_VERBOSE_GC = "gearpump.verbose-gc" + + // The time out for Future, like ask. + // !Important! This global timeout setting will also impact the UI + // responsive time if set to too big. Please make sure you have + // enough justification to change this global setting, otherwise + // please use your local timeout setting instead. + val FUTURE_TIMEOUT = akka.util.Timeout(15, TimeUnit.SECONDS) + + val GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS = "gearpump.start-executor-system-timeout-ms" + + val APPMASTER_DEFAULT_EXECUTOR_ID = -1 + + val NETTY_BUFFER_SIZE = "gearpump.netty.buffer-size" + val NETTY_MAX_RETRIES = "gearpump.netty.max-retries" + val NETTY_BASE_SLEEP_MS = "gearpump.netty.base-sleep-ms" + val NETTY_MAX_SLEEP_MS = "gearpump.netty.max-sleep-ms" + val NETTY_MESSAGE_BATCH_SIZE = "gearpump.netty.message-batch-size" + val NETTY_FLUSH_CHECK_INTERVAL = "gearpump.netty.flush-check-interval" + val NETTY_TCP_HOSTNAME = "akka.remote.netty.tcp.hostname" + val NETTY_DISPATCHER = "gearpump.netty.dispatcher" + + val GEARPUMP_USERNAME = "gearpump.username" + val GEARPUMP_APPLICATION_ID = "gearpump.applicationId" + val GEARPUMP_MASTER_STARTTIME = "gearpump.master.starttime" + val GEARPUMP_EXECUTOR_ID = "gearpump.executorId" + // Application jar property + val GEARPUMP_APP_JAR = "gearpump.app.jar" + val GEARPUMP_APP_NAME_PREFIX = "gearpump.app.name.prefix" + + // Where the jar is stored at. It can be a HDFS, or a local disk. + val GEARPUMP_APP_JAR_STORE_ROOT_PATH = "gearpump.jarstore.rootpath" + + // Uses java property -Dgearpump.config.file=xxx.conf to set customized configuration + // Otherwise application.conf in classpath will be loaded + val GEARPUMP_CUSTOM_CONFIG_FILE = "gearpump.config.file" + + // Metrics related + val GEARPUMP_METRIC_ENABLED = "gearpump.metrics.enabled" + val GEARPUMP_METRIC_SAMPLE_RATE = "gearpump.metrics.sample-rate" + val GEARPUMP_METRIC_REPORT_INTERVAL = "gearpump.metrics.report-interval-ms" + val GEARPUMP_METRIC_GRAPHITE_HOST = "gearpump.metrics.graphite.host" + val GEARPUMP_METRIC_GRAPHITE_PORT = "gearpump.metrics.graphite.port" + val GEARPUMP_METRIC_REPORTER = "gearpump.metrics.reporter" + + // Retains at max @RETAIN_HISTORY_HOURS history data + val GEARPUMP_METRIC_RETAIN_HISTORY_DATA_HOURS = "gearpump.metrics.retainHistoryData.hours" + + // Time interval between two history data points. + val GEARPUMP_RETAIN_HISTORY_DATA_INTERVAL_MS = "gearpump.metrics.retainHistoryData.intervalMs" + + // Retains at max @RETAIN_LATEST_SECONDS recent data points + val GEARPUMP_RETAIN_RECENT_DATA_SECONDS = "gearpump.metrics.retainRecentData.seconds" + + // time interval between two recent data points. + val GEARPUMP_RETAIN_RECENT_DATA_INTERVAL_MS = "gearpump.metrics.retainRecentData.intervalMs" + + // AppMaster will max wait this time until it declare the resource cannot be allocated, + // and shutdown itself + val GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT = "gearpump.resource-allocation-timeout-seconds" + + // Service related + val GEARPUMP_SERVICE_HTTP = "gearpump.services.http" + val GEARPUMP_SERVICE_HOST = "gearpump.services.host" + val GEARPUMP_SERVICE_SUPERVISOR_PATH = "gearpump.services.supervisor-actor-path" + val GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE = "gearpump.services.config-render-option-concise" + + // The partitioners provided by Gearpump + val BUILTIN_PARTITIONERS = Array( + classOf[BroadcastPartitioner], + classOf[CoLocationPartitioner], + classOf[HashPartitioner], + classOf[ShuffleGroupingPartitioner], + classOf[ShufflePartitioner]) + + // Security related + val GEARPUMP_KEYTAB_FILE = "gearpump.keytab.file" + val GEARPUMP_KERBEROS_PRINCIPAL = "gearpump.kerberos.principal" + + val GEARPUMP_METRICS_MAX_LIMIT = "gearpump.metrics.akka.max-limit-on-query" + val GEARPUMP_METRICS_AGGREGATORS = "gearpump.metrics.akka.metrics-aggregator-class" + + val GEARPUMP_UI_SECURITY = "gearpump.ui-security" + val GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED = "gearpump.ui-security.authentication-enabled" + val GEARPUMP_UI_AUTHENTICATOR_CLASS = "gearpump.ui-security.authenticator" + // OAuth Authentication Factory for UI server. + val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ENABLED = "gearpump.ui-security.oauth2-authenticator-enabled" + val GEARPUMP_UI_OAUTH2_AUTHENTICATORS = "gearpump.ui-security.oauth2-authenticators" + val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS = "class" + val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CALLBACK = "callback" + val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_ID = "clientid" + val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_SECRET = "clientsecret" + val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_DEFAULT_USER_ROLE = "default-userrole" + val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_AUTHORIZATION_CODE = "code" + val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ACCESS_TOKEN = "accesstoken" + + val PREFER_IPV4 = "java.net.preferIPv4Stack" + + val APPLICATION_EXECUTOR_NUMBER = "gearpump.application.executor-num" + + val AKKA_SCHEDULER_TICK_DURATION = "akka.scheduler.tick-duration" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala b/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala new file mode 100644 index 0000000..d13f514 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import java.io.{File, IOException} +import java.nio.charset.Charset + +import io.gearpump.google.common.io.Files + +object FileUtils { + private val UTF8 = Charset.forName("UTF-8") + + def write(file: File, str: String): Unit = { + Files.write(str, file, UTF8) + } + + def read(file: File): String = { + Files.asCharSource(file, UTF8).read() + } + + def writeByteArrayToFile(file: File, bytes: Array[Byte]): Unit = { + Files.write(bytes, file) + } + + def readFileToByteArray(file: File): Array[Byte] = { + Files.toByteArray(file) + } + + /** recursively making all parent directories including itself */ + def forceMkdir(directory: File): Unit = { + if (directory.exists() && directory.isFile) { + throw new IOException(s"Failed to create directory ${directory.toString}, it already exist") + } + Files.createParentDirs(directory) + val result = directory.mkdir() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/Graph.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/Graph.scala b/core/src/main/scala/org/apache/gearpump/util/Graph.scala new file mode 100644 index 0000000..6bb58b8 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/util/Graph.scala @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util +import scala.annotation.tailrec +import scala.collection.mutable +import scala.language.implicitConversions + +/** + * Generic mutable Graph libraries. + */ +class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serializable { + + private val _vertices = mutable.Set.empty[N] + private val _edges = mutable.Set.empty[(N, E, N)] + + // This is used to ensure the output of this Graph is always stable + // Like method vertices(), or edges() + private var _indexs = Map.empty[Any, Int] + private var _nextIndex = 0 + private def nextId: Int = { + val result = _nextIndex + _nextIndex += 1 + result + } + + private def init(): Unit = { + Option(vertexList).getOrElse(List.empty[N]).foreach(addVertex(_)) + Option(edgeList).getOrElse(List.empty[(N, E, N)]).foreach(addEdge(_)) + } + + init() + + /** + * Add a vertex + * Current Graph is changed. + */ + def addVertex(vertex: N): Unit = { + val result = _vertices.add(vertex) + if (result) { + _indexs += vertex -> nextId + } + } + + /** + * Add a edge + * Current Graph is changed. + */ + def addEdge(edge: (N, E, N)): Unit = { + val result = _edges.add(edge) + if (result) { + _indexs += edge -> nextId + } + } + + /** + * return all vertices. + * The result is stable + */ + def vertices: List[N] = { + // Sorts the vertex so that we can keep the order for mapVertex + _vertices.toList.sortBy(_indexs(_)) + } + + /** + * out degree + */ + def outDegreeOf(node: N): Int = { + edges.count(_._1 == node) + } + + /** + * in degree + */ + def inDegreeOf(node: N): Int = { + edges.count(_._3 == node) + } + + /** + * out going edges. + */ + def outgoingEdgesOf(node: N): List[(N, E, N)] = { + edges.filter(_._1 == node) + } + + /** + * incoming edges. + */ + def incomingEdgesOf(node: N): List[(N, E, N)] = { + edges.filter(_._3 == node) + } + + /** + * Remove vertex + * Current Graph is changed. + */ + def removeVertex(node: N): Unit = { + _vertices.remove(node) + _indexs -= node + val toBeRemoved = incomingEdgesOf(node) ++ outgoingEdgesOf(node) + toBeRemoved.foreach(removeEdge(_)) + } + + /** + * Remove edge + * Current Graph is changed. + */ + private def removeEdge(edge: (N, E, N)): Unit = { + _indexs -= edge + _edges.remove(edge) + } + + /** + * add edge + * Current Graph is changed. + */ + def addEdge(node1: N, edge: E, node2: N): Unit = { + addVertex(node1) + addVertex(node2) + addEdge((node1, edge, node2)) + } + + /** + * Map a graph to a new graph, with vertex converted to a new type + * Current Graph is not changed. + */ + def mapVertex[NewNode](fun: N => NewNode): Graph[NewNode, E] = { + val vertexes = vertices.map(node => (node, fun(node))) + + val vertexMap: Map[N, NewNode] = vertexes.toMap + + val newEdges = edges.map { edge => + (vertexMap(edge._1), edge._2, vertexMap(edge._3)) + } + new Graph(vertexes.map(_._2), newEdges) + } + + /** + * Map a graph to a new graph, with edge converted to new type + * Current graph is not changed. + */ + def mapEdge[NewEdge](fun: (N, E, N) => NewEdge): Graph[N, NewEdge] = { + val newEdges = edges.map { edge => + (edge._1, fun(edge._1, edge._2, edge._3), edge._3) + } + new Graph(vertices, newEdges) + } + + /** + * edges connected to node + */ + def edgesOf(node: N): List[(N, E, N)] = { + (incomingEdgesOf(node) ++ outgoingEdgesOf(node)).toSet[(N, E, N)].toList.sortBy(_indexs(_)) + } + + /** + * all edges + */ + def edges: List[(N, E, N)] = { + _edges.toList.sortBy(_indexs(_)) + } + + /** + * Add another graph + * Current graph is changed. + */ + def addGraph(other: Graph[N, E]): Graph[N, E] = { + (vertices ++ other.vertices).foreach(addVertex(_)) + (edges ++ other.edges).foreach(edge => addEdge(edge._1, edge._2, edge._3)) + this + } + + /** + * clone the graph + */ + def copy: Graph[N, E] = { + new Graph(vertices, edges) + } + + /** + * check empty + */ + def isEmpty: Boolean = { + val vertexCount = vertices.size + val edgeCount = edges.length + if (vertexCount + edgeCount == 0) { + true + } else { + false + } + } + + /** + * sub-graph which contains current node and all neighbour + * nodes and edges. + */ + def subGraph(node: N): Graph[N, E] = { + val newGraph = Graph.empty[N, E] + for (edge <- edgesOf(node)) { + newGraph.addEdge(edge._1, edge._2, edge._3) + } + newGraph + } + + /** + * replace vertex, the current Graph is mutated. + */ + def replaceVertex(node: N, newNode: N): Graph[N, E] = { + for (edge <- incomingEdgesOf(node)) { + addEdge(edge._1, edge._2, newNode) + } + + for (edge <- outgoingEdgesOf(node)) { + addEdge(newNode, edge._2, edge._3) + } + removeVertex(node) + this + } + + private def removeZeroInDegree: List[N] = { + val toBeRemoved = vertices.filter(inDegreeOf(_) == 0).sortBy(_indexs(_)) + toBeRemoved.foreach(removeVertex(_)) + toBeRemoved + } + + /** + * Return an iterator of vertex in topological order + * The node returned by Iterator is stable sorted. + */ + def topologicalOrderIterator: Iterator[N] = { + val newGraph = copy + var output = List.empty[N] + + while (!newGraph.isEmpty) { + output ++= newGraph.removeZeroInDegree + } + output.iterator + } + + /** + * Return all circles in graph. + * + * The reference of this algorithm is: + * https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm + */ + private def findCircles: mutable.MutableList[mutable.MutableList[N]] = { + val inStack = mutable.Map.empty[N, Boolean] + val stack = mutable.Stack[N]() + val indexMap = mutable.Map.empty[N, Int] + val lowLink = mutable.Map.empty[N, Int] + var index = 0 + + val circles = mutable.MutableList.empty[mutable.MutableList[N]] + + def tarjan(node: N): Unit = { + indexMap(node) = index + lowLink(node) = index + index += 1 + inStack(node) = true + stack.push(node) + + outgoingEdgesOf(node).foreach { + edge => { + if (!indexMap.contains(edge._3)) { + tarjan(edge._3) + if (lowLink.get(edge._3).get < lowLink.get(node).get) { + lowLink(node) = lowLink(edge._3) + } + } else { + if (inStack.get(edge._3).get && (indexMap.get(edge._3).get < lowLink.get(node).get)) { + lowLink(node) = indexMap(edge._3) + } + } + } + } + + if (indexMap.get(node).get == lowLink.get(node).get) { + val circle = mutable.MutableList.empty[N] + var n = node + do { + n = stack.pop() + inStack(n) = false + circle += n + } while (n != node) + circles += circle + } + } + + vertices.foreach { + node => { + if (!indexMap.contains(node)) tarjan(node) + } + } + + circles + } + + /** + * Return an iterator of vertex in topological order of graph with circles + * The node returned by Iterator is stable sorted. + * + * The reference of this algorithm is: + * http://www.drdobbs.com/database/topological-sorting/184410262 + */ + def topologicalOrderWithCirclesIterator: Iterator[N] = { + val circles = findCircles + val newGraph = Graph.empty[mutable.MutableList[N], E] + circles.foreach { + circle => { + newGraph.addVertex(circle) + } + } + + for (circle1 <- circles; circle2 <- circles; if circle1 != circle2) yield { + for (node1 <- circle1; node2 <- circle2) yield { + var edges = outgoingEdgesOf(node1) + for (edge <- edges; if edge._3 == node2) yield { + newGraph.addEdge(circle1, edge._2, circle2) + } + + edges = outgoingEdgesOf(node2) + for (edge <- edges; if edge._3 == node1) yield { + newGraph.addEdge(circle2, edge._2, circle1) + } + } + } + + val topo = newGraph.topologicalOrderIterator + topo.flatMap(_.sortBy(_indexs(_)).iterator) + } + + /** + * check whether there is a loop + */ + def hasCycle(): Boolean = { + @tailrec + def detectCycle(graph: Graph[N, E]): Boolean = { + if (graph.edges.isEmpty) { + false + } else if (graph.vertices.nonEmpty && !graph.vertices.exists(graph.inDegreeOf(_) == 0)) { + true + } else { + graph.removeZeroInDegree + detectCycle(graph) + } + } + + detectCycle(copy) + } + + /** + * Check whether there are two edges connecting two nodes. + */ + def hasDuplicatedEdge(): Boolean = { + edges.groupBy(edge => (edge._1, edge._3)).values.exists(_.size > 1) + } + + /** + * Generate a level map for each vertex withholding: + * {{{ + * if vertex A -> B, then level(A) -> level(B) + * }}} + */ + def vertexHierarchyLevelMap(): Map[N, Int] = { + val newGraph = copy + var output = Map.empty[N, Int] + var level = 0 + while (!newGraph.isEmpty) { + output ++= newGraph.removeZeroInDegree.map((_, level)).toMap + level += 1 + } + output + } + + override def toString: String = { + Map("vertices" -> vertices.mkString(","), + "edges" -> edges.mkString(",")).toString() + } +} + +object Graph { + + /** + * Example: + * + * {{{ + * Graph(1 ~ 2 ~> 4 ~ 5 ~> 7, 8~9~>55, 11) + * Will create a graph with: + * nodes: + * 1, 4, 7, 8, 55, 11 + * edge: + * 2: (1->4) + * 5: (4->7) + * 9: (8->55) + * }}} + */ + def apply[N, E](elems: Path[_ <: N, _ <: E]*): Graph[N, E] = { + val graph = empty[N, E] + elems.foreach { path => + path.updategraph(graph) + } + graph + } + + def apply[N, E](vertices: List[N], edges: List[(N, E, N)]): Graph[N, E] = { + new Graph(vertices, edges) + } + + def unapply[N, E](graph: Graph[N, E]): Option[(List[N], List[(N, E, N)])] = { + Some((graph.vertices, graph.edges)) + } + + def empty[N, E]: Graph[N, E] = { + new Graph(List.empty[N], List.empty[(N, E, N)]) + } + + class Path[N, + E](path: List[Either[N, E]]) { + + def ~[Edge >: E](edge: Edge): Path[N, Edge] = { + new Path(path :+ Right(edge)) + } + + def ~>[Node >: N](node: Node): Path[Node, E] = { + new Path(path :+ Left(node)) + } + + def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, Edge] = { + this ~ edge ~> node + } + + private[Graph] def updategraph[Node >: N, Edge >: E](graph: Graph[Node, Edge]): Unit = { + val nodeEdgePair: Tuple2[Option[N], Option[E]] = (None, None) + path.foldLeft(nodeEdgePair) { (pair, either) => + val (lastNode, lastEdge) = pair + either match { + case Left(node) => + graph.addVertex(node) + if (lastNode.isDefined) { + graph.addEdge(lastNode.get, lastEdge.getOrElse(null.asInstanceOf[Edge]), node) + } + (Some(node), None) + case Right(edge) => + (lastNode, Some(edge)) + } + } + } + } + + object Path { + implicit def anyToPath[N, E](any: N): Path[N, E] = Node(any) + } + + implicit class Node[N, E](self: N) extends Path[N, E](List(Left(self))) { + + override def ~[Edge](edge: Edge): Path[N, Edge] = { + new Path(List(Left(self), Right(edge))) + } + + override def ~>[Node >: N](node: Node): Path[Node, E] = { + new NodeList(List(self, node)) + } + + override def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, Edge] = { + this ~ edge ~> node + } + } + + class NodeList[N, E](nodes: List[N]) extends Path[N, E](nodes.map(Left(_))) { + override def ~[Edge](edge: Edge): Path[N, Edge] = { + new Path(nodes.map(Left(_)) :+ Right(edge)) + } + + override def ~>[Node >: N](node: Node): Path[Node, E] = { + new NodeList(nodes :+ node) + } + + override def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, Edge] = { + this ~ edge ~> node + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala b/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala new file mode 100644 index 0000000..549c34f --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import java.util +import scala.collection.mutable.ListBuffer + +import akka.actor.Actor +import com.typesafe.config.Config +import org.slf4j.Logger + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, ReadOption} +import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem} +import org.apache.gearpump.metrics.Metrics._ +import org.apache.gearpump.metrics.MetricsAggregator +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.HistoryMetricsService.{DummyMetricsAggregator, HistoryMetricsConfig, HistoryMetricsStore, SkipAllAggregator} + +/** + * + * Metrics service to serve history metrics data + * + * For simplicity, HistoryMetricsService will maintain 72 hours coarse-grained data + * for last 72 hours, and fine-grained data for past 5 min. + * + * For the coarse-grained data of past 72 hours, one or two sample point will be stored + * for each hour. + * + * For fine-grained data in last 5 min, there will be 1 sample point per 15 seconds. + */ +class HistoryMetricsService(name: String, config: HistoryMetricsConfig) extends Actor { + private val LOG: Logger = LogUtil.getLogger(getClass, name = name) + private var metricsStore = Map.empty[String, HistoryMetricsStore] + private val systemConfig = context.system.settings.config + + def receive: Receive = metricHandler orElse commandHandler + def metricHandler: Receive = { + case ReportMetrics => + sender ! DemandMoreMetrics(self) + case metrics: MetricType => + val name = metrics.name + if (metricsStore.contains(name)) { + metricsStore(name).add(metrics) + } else { + val store = HistoryMetricsStore(name, metrics, config) + metricsStore += name -> store + store.add(metrics) + } + } + + private def toRegularExpression(input: String): String = { + "^" + input.flatMap { + case '*' => ".*" + case '?' => "." + case char if "()[]$^.{}|\\".contains(char) => "\\" + char + case other => s"$other" + } + ".*$" + } + + private def fetchMetricsHistory(pathPattern: String, readOption: ReadOption.ReadOption) + : List[HistoryMetricsItem] = { + + val result = new ListBuffer[HistoryMetricsItem] + + val regex = toRegularExpression(pathPattern).r.pattern + + val iter = metricsStore.iterator + while (iter.hasNext) { + val (name, store) = iter.next() + + val matcher = regex.matcher(name) + if (matcher.matches()) { + readOption match { + case ReadOption.ReadLatest => + result.append(store.readLatest: _*) + case ReadOption.ReadRecent => + result.append(store.readRecent: _*) + case ReadOption.ReadHistory => + result.append(store.readHistory: _*) + case _ => + // Skip all other options. + } + } + } + result.toList + } + + val dummyAggregator = new DummyMetricsAggregator + private var aggregators: Map[String, MetricsAggregator] = Map.empty[String, MetricsAggregator] + + import scala.collection.JavaConverters._ + private val validAggregators: Set[String] = { + val rootConfig = systemConfig.getConfig(Constants.GEARPUMP_METRICS_AGGREGATORS).root.unwrapped + rootConfig.keySet().asScala.toSet + } + + def commandHandler: Receive = { + // Path accept syntax ? *, ? will match one char, * will match at least one char + case QueryHistoryMetrics(inputPath, readOption, aggregatorClazz, options) => + + val aggregator = { + if (aggregatorClazz == null || aggregatorClazz.isEmpty) { + dummyAggregator + } else if (aggregators.contains(aggregatorClazz)) { + aggregators(aggregatorClazz) + } else if (validAggregators.contains(aggregatorClazz)) { + val clazz = Class.forName(aggregatorClazz) + val constructor = clazz.getConstructor(classOf[Config]) + val aggregator = constructor.newInstance(systemConfig).asInstanceOf[MetricsAggregator] + aggregators += aggregatorClazz -> aggregator + aggregator + } else { + LOG.error(s"Aggregator $aggregatorClazz is not in the white list ${validAggregators}, " + + s"we will drop all messages. Please see config at ${GEARPUMP_METRICS_AGGREGATORS}") + val skipAll = new SkipAllAggregator + aggregators += aggregatorClazz -> new SkipAllAggregator + skipAll + } + } + + val metrics = fetchMetricsHistory(inputPath, readOption).iterator + sender ! HistoryMetrics(inputPath, aggregator.aggregate(options, metrics)) + } +} + +object HistoryMetricsService { + + trait MetricsStore { + def add(inputMetrics: MetricType): Unit + + def read: List[HistoryMetricsItem] + + /** + * read latest inserted records + * @return + */ + def readLatest: List[HistoryMetricsItem] + } + + trait HistoryMetricsStore { + def add(inputMetrics: MetricType): Unit + + /** + * read latest inserted records + * @return + */ + def readLatest: List[HistoryMetricsItem] + + def readRecent: List[HistoryMetricsItem] + + def readHistory: List[HistoryMetricsItem] + } + + class DummyHistoryMetricsStore extends HistoryMetricsStore { + + val empty = List.empty[HistoryMetricsItem] + + override def add(inputMetrics: MetricType): Unit = Unit + + override def readRecent: List[HistoryMetricsItem] = empty + + /** + * read latest inserted records + * @return + */ + override def readLatest: List[HistoryMetricsItem] = empty + + override def readHistory: List[HistoryMetricsItem] = empty + } + + object HistoryMetricsStore { + def apply(name: String, metric: MetricType, config: HistoryMetricsConfig) + : HistoryMetricsStore = { + metric match { + case histogram: Histogram => new HistogramMetricsStore(config) + case meter: Meter => new MeterMetricsStore(config) + case counter: Counter => new CounterMetricsStore(config) + case gauge: Gauge => new GaugeMetricsStore(config) + case _ => new DummyHistoryMetricsStore // other metrics are not supported + } + } + } + + /** + * Metrics store to store history data points + * For each time point, we will store single data point. + * + * @param retainCount how many data points to retain, old data will be removed + * @param retainIntervalMs time interval between two data points. + */ + class SingleValueMetricsStore(retainCount: Int, retainIntervalMs: Long) extends MetricsStore { + + private val queue = new util.ArrayDeque[HistoryMetricsItem]() + private var latest = List.empty[HistoryMetricsItem] + + // End of the time window we are tracking + private var endTime = 0L + + override def add(inputMetrics: MetricType): Unit = { + add(inputMetrics, System.currentTimeMillis()) + } + + def add(inputMetrics: MetricType, now: TimeStamp): Unit = { + + val metrics = HistoryMetricsItem(now, inputMetrics) + latest = List(metrics) + + if (now >= endTime) { + queue.addFirst(metrics) + endTime = (now / retainIntervalMs + 1) * retainIntervalMs + + // Removes old data + if (queue.size() > retainCount) { + queue.removeLast() + } + } + } + + def read: List[HistoryMetricsItem] = { + val result = new ListBuffer[HistoryMetricsItem] + import scala.collection.JavaConverters._ + queue.iterator().asScala.foreach(result.prepend(_)) + result.toList + } + + override def readLatest: List[HistoryMetricsItem] = { + latest + } + } + + /** + * Config for how long to keep history metrics data. + * + * @param retainHistoryDataHours Retain at max @RETAIN_HISTORY_HOURS history data(unit hour) + * @param retainHistoryDataIntervalMs time interval between two history data points.(unit: ms) + * @param retainRecentDataSeconds Retain at max @RETAIN_LATEST_SECONDS + * recent data points(unit: seconds) + * @param retainRecentDataIntervalMs Retain at max @RETAIN_LATEST_SECONDS recent + * data points(unit: ms) + */ + case class HistoryMetricsConfig( + retainHistoryDataHours: Int, + retainHistoryDataIntervalMs: Int, + retainRecentDataSeconds: Int, + retainRecentDataIntervalMs: Int) + + object HistoryMetricsConfig { + def apply(config: Config): HistoryMetricsConfig = { + val historyHour = config.getInt(GEARPUMP_METRIC_RETAIN_HISTORY_DATA_HOURS) + val historyInterval = config.getInt(GEARPUMP_RETAIN_HISTORY_DATA_INTERVAL_MS) + + val recentSeconds = config.getInt(GEARPUMP_RETAIN_RECENT_DATA_SECONDS) + val recentInterval = config.getInt(GEARPUMP_RETAIN_RECENT_DATA_INTERVAL_MS) + HistoryMetricsConfig(historyHour, historyInterval, recentSeconds, recentInterval) + } + } + + class HistogramMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore { + + private val history = new SingleValueMetricsStore( + config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs, + config.retainHistoryDataIntervalMs) + + private val recent = new SingleValueMetricsStore( + config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs, + config.retainRecentDataIntervalMs) + + override def add(inputMetrics: MetricType): Unit = { + recent.add(inputMetrics) + history.add(inputMetrics) + } + + override def readRecent: List[HistoryMetricsItem] = { + recent.read + } + + override def readHistory: List[HistoryMetricsItem] = { + history.read + } + + override def readLatest: List[HistoryMetricsItem] = { + recent.readLatest + } + } + + class MeterMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore { + + private val history = new SingleValueMetricsStore( + config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs, + config.retainHistoryDataIntervalMs) + + private val recent = new SingleValueMetricsStore( + config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs, + config.retainRecentDataIntervalMs) + + override def add(inputMetrics: MetricType): Unit = { + recent.add(inputMetrics) + history.add(inputMetrics) + } + + override def readRecent: List[HistoryMetricsItem] = { + recent.read + } + + override def readHistory: List[HistoryMetricsItem] = { + history.read + } + + override def readLatest: List[HistoryMetricsItem] = { + recent.readLatest + } + } + + class CounterMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore { + + private val history = new SingleValueMetricsStore( + config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs, + config.retainHistoryDataIntervalMs) + + private val recent = new SingleValueMetricsStore( + config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs, + config.retainRecentDataIntervalMs) + + override def add(inputMetrics: MetricType): Unit = { + history.add(inputMetrics) + recent.add(inputMetrics) + } + + override def readRecent: List[HistoryMetricsItem] = { + recent.read + } + + override def readHistory: List[HistoryMetricsItem] = { + history.read + } + + override def readLatest: List[HistoryMetricsItem] = { + recent.readLatest + } + } + + class GaugeMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore { + + private val compartor = (left: HistoryMetricsItem, right: HistoryMetricsItem) => + left.value.asInstanceOf[Gauge].value > right.value.asInstanceOf[Gauge].value + + private val history = new SingleValueMetricsStore( + config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs, + config.retainHistoryDataIntervalMs) + + private val recent = new SingleValueMetricsStore( + config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs, + config.retainRecentDataIntervalMs) + + override def add(inputMetrics: MetricType): Unit = { + recent.add(inputMetrics) + history.add(inputMetrics) + } + + override def readRecent: List[HistoryMetricsItem] = { + recent.read + } + + override def readHistory: List[HistoryMetricsItem] = { + history.read + } + + override def readLatest: List[HistoryMetricsItem] = { + recent.readLatest + } + } + + class DummyMetricsAggregator extends MetricsAggregator { + def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]) + : List[HistoryMetricsItem] = { + inputs.toList + } + } + + class SkipAllAggregator extends MetricsAggregator { + private val empty = List.empty[HistoryMetricsItem] + def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]) + : List[HistoryMetricsItem] = { + empty + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/LogUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/LogUtil.scala b/core/src/main/scala/org/apache/gearpump/util/LogUtil.scala new file mode 100644 index 0000000..225f796 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/util/LogUtil.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import java.io.File +import java.net.InetAddress +import java.util.Properties +import scala.util.Try + +import com.typesafe.config.Config +import org.apache.log4j.PropertyConfigurator +import org.slf4j.{Logger, LoggerFactory} + +object LogUtil { + object ProcessType extends Enumeration { + type ProcessType = Value + val MASTER, WORKER, LOCAL, APPLICATION, UI = Value + } + + def getLogger[T]( + clazz: Class[T], context: String = null, master: Any = null, worker: Any = null, + executor: Any = null, task: Any = null, app: Any = null, name: String = null): Logger = { + var env = "" + + if (null != context) { + env += context + } + if (null != master) { + env += "master" + master + } + if (null != worker) { + env += "worker" + worker + } + + if (null != app) { + env += "app" + app + } + + if (null != executor) { + env += "exec" + executor + } + if (null != task) { + env += task + } + if (null != name) { + env += name + } + + if (!env.isEmpty) { + LoggerFactory.getLogger(clazz.getSimpleName + "@" + env) + } else { + LoggerFactory.getLogger(clazz.getSimpleName) + } + } + + /** Custom the log file locations by reading config from system properties */ + def loadConfiguration(config: Config, processType: ProcessType.ProcessType): Unit = { + // Set log file name + val propName = s"gearpump.${processType.toString.toLowerCase}.log.file" + val props = loadConfiguration + + props.setProperty("gearpump.log.file", "${" + propName + "}") + + props.setProperty("JVM_NAME", jvmName) + + processType match { + case ProcessType.APPLICATION => + props.setProperty("log4j.rootAppender", "${gearpump.application.logger}") + props.setProperty("gearpump.application.log.rootdir", + applicationLogDir(config).getAbsolutePath) + case _ => + props.setProperty("log4j.rootAppender", "${gearpump.root.logger}") + props.setProperty("gearpump.log.dir", daemonLogDir(config).getAbsolutePath) + } + + PropertyConfigurator.configure(props) + } + + def daemonLogDir(config: Config): File = { + val dir = config.getString(Constants.GEARPUMP_LOG_DAEMON_DIR) + new File(dir) + } + + def verboseLogToConsole(): Unit = { + val props = loadConfiguration + props.setProperty("log4j.rootLogger", "DEBUG,console") + PropertyConfigurator.configure(props) + } + + def loadConfiguration: Properties = { + val props = new Properties() + val log4jConfStream = getClass().getClassLoader.getResourceAsStream("log4j.properties") + if (log4jConfStream != null) { + props.load(log4jConfStream) + } + log4jConfStream.close() + props + } + + private def jvmName: String = { + val hostname = Try(InetAddress.getLocalHost.getHostName).getOrElse("local") + java.lang.management.ManagementFactory.getRuntimeMXBean().getName() + } + + def applicationLogDir(config: Config): File = { + val appLogDir = config.getString(Constants.GEARPUMP_LOG_APPLICATION_DIR) + new File(appLogDir) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/ProcessLogRedirector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/ProcessLogRedirector.scala b/core/src/main/scala/org/apache/gearpump/util/ProcessLogRedirector.scala new file mode 100644 index 0000000..e6d79e4 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/util/ProcessLogRedirector.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import java.io.{Closeable, Flushable} +import scala.sys.process.ProcessLogger + +import org.slf4j.LoggerFactory + +/** Redirect the console output to parent process */ +class ProcessLogRedirector extends ProcessLogger with Closeable with Flushable with ConsoleOutput { + private val LOG = LoggerFactory.getLogger("redirect") + + // We only capture the first 1K chars + private final val LENGTH = 1000 + private var _error: String = "" + private var _output: String = "" + + def error: String = _error + def output: String = _output + + def out(s: => String): Unit = { + if (_output.length <= LENGTH) { + _output += "\n" + s + } + LOG.info(s) + } + def err(s: => String): Unit = { + if (_error.length <= LENGTH) { + _error += "\n" + s + } + LOG.error(s) + } + def buffer[T](f: => T): T = f + def close(): Unit = Unit + def flush(): Unit = Unit +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/ReferenceEqual.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/ReferenceEqual.scala b/core/src/main/scala/org/apache/gearpump/util/ReferenceEqual.scala new file mode 100644 index 0000000..69e0d4c --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/util/ReferenceEqual.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +/** + * Check equal using reference-equal. + */ +trait ReferenceEqual extends AnyRef { + + override def equals(other: Any): Boolean = { + this.eq(other.asInstanceOf[AnyRef]) + } + + override def hashCode(): Int = { + super.hashCode() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/RestartPolicy.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/RestartPolicy.scala b/core/src/main/scala/org/apache/gearpump/util/RestartPolicy.scala new file mode 100644 index 0000000..97d6dd0 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/util/RestartPolicy.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import scala.concurrent.duration.Duration + +import akka.actor.ChildRestartStats + +/** + * When one executor or task fails, Gearpump will try to start. However, if it fails after + * multiple retries, then we abort. + * + * @param maxNrOfRetries The number of times is allowed to be restarted, negative value means no + * limit, if the limit is exceeded the policy will not allow to restart + * @param withinTimeRange Duration of the time window for maxNrOfRetries. + * Duration.Inf means no window + */ +class RestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) { + private val status = new ChildRestartStats(null, 0, 0L) + private val retriesWindow = (Some(maxNrOfRetries), Some(withinTimeRange.toMillis.toInt)) + + def allowRestart: Boolean = { + status.requestRestartPermission(retriesWindow) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/RichProcess.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/RichProcess.scala b/core/src/main/scala/org/apache/gearpump/util/RichProcess.scala new file mode 100644 index 0000000..1729d32 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/util/RichProcess.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import scala.sys.process.Process + +trait ConsoleOutput { + def output: String + def error: String +} + +/** Extends Process by providing a additional logger: ConsoleOutput interface. */ +class RichProcess(process: Process, _logger: ConsoleOutput) extends Process { + def exitValue(): scala.Int = process.exitValue() + def destroy(): scala.Unit = process.destroy() + def logger: ConsoleOutput = _logger +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/TimeOutScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/TimeOutScheduler.scala b/core/src/main/scala/org/apache/gearpump/util/TimeOutScheduler.scala new file mode 100644 index 0000000..0e2e27c --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/util/TimeOutScheduler.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import java.util.concurrent.TimeUnit +import scala.concurrent.duration._ + +import akka.actor.{Actor, ActorRef} +import akka.pattern.ask + +/** A helper util to send a message to remote actor and notify callback when timeout */ +trait TimeOutScheduler { + this: Actor => + import context.dispatcher + + def sendMsgWithTimeOutCallBack( + target: ActorRef, msg: AnyRef, milliSeconds: Long, timeOutHandler: => Unit): Unit = { + val result = target.ask(msg)(FiniteDuration(milliSeconds, TimeUnit.MILLISECONDS)) + result onSuccess { + case msg => + self ! msg + } + result onFailure { + case _ => timeOutHandler + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/Util.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/Util.scala b/core/src/main/scala/org/apache/gearpump/util/Util.scala new file mode 100644 index 0000000..19bd5a8 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/util/Util.scala @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import java.io.{BufferedReader, File, FileInputStream, InputStreamReader} +import java.net.{ServerSocket, URI} +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.sys.process.Process +import scala.util.{Failure, Success, Try} + +import com.typesafe.config.{Config, ConfigFactory} + +import org.apache.gearpump.cluster.AppJar +import org.apache.gearpump.jarstore.JarStoreService +import org.apache.gearpump.transport.HostPort + +object Util { + val LOG = LogUtil.getLogger(getClass) + private val defaultUri = new URI("file:///") + private val appNamePattern = "^[a-zA-Z_][a-zA-Z0-9_]+$".r.pattern + + def validApplicationName(appName: String): Boolean = { + appNamePattern.matcher(appName).matches() + } + + def getCurrentClassPath: Array[String] = { + val classpath = System.getProperty("java.class.path") + val classpathList = classpath.split(File.pathSeparator) + classpathList + } + + def version: String = { + val home = System.getProperty(Constants.GEARPUMP_HOME) + val version = Try { + val versionFile = new FileInputStream(new File(home, "VERSION")) + val reader = new BufferedReader(new InputStreamReader(versionFile)) + val version = reader.readLine().replace("version:=", "") + versionFile.close() + version + } + version match { + case Success(version) => + version + case Failure(ex) => + LOG.error("failed to read VERSION file, " + ex.getMessage) + "Unknown-Version" + } + } + + def startProcess(options: Array[String], classPath: Array[String], mainClass: String, + arguments: Array[String]): RichProcess = { + val java = System.getProperty("java.home") + "/bin/java" + val command = List(java) ++ options ++ + List("-cp", classPath.mkString(File.pathSeparator), mainClass) ++ arguments + LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")} " + + s"\n ${options.mkString(" ")}") + val logger = new ProcessLogRedirector() + val process = Process(command).run(logger) + new RichProcess(process, logger) + } + + /** + * hostList format: host1:port1,host2:port2,host3:port3... + */ + def parseHostList(hostList: String): List[HostPort] = { + val masters = hostList.trim.split(",").map { address => + val hostAndPort = address.split(":") + HostPort(hostAndPort(0), hostAndPort(1).toInt) + } + masters.toList + } + + def resolvePath(path: String): String = { + val uri = new URI(path) + if (uri.getScheme == null && uri.getFragment == null) { + val absolutePath = new File(path).getCanonicalPath.replaceAll("\\\\", "/") + "file://" + absolutePath + } else { + path + } + } + + def isLocalPath(path: String): Boolean = { + val uri = new URI(path) + val scheme = uri.getScheme + val authority = uri.getAuthority + if (scheme == null && authority == null) { + true + } else if (scheme == defaultUri.getScheme) { + true + } else { + false + } + } + + def randInt(): Int = { + Math.abs(ThreadLocalRandom.current.nextInt()) + } + + def findFreePort(): Try[Int] = { + Try { + val socket = new ServerSocket(0) + socket.setReuseAddress(true) + val port = socket.getLocalPort() + socket.close + port + } + } + + def uploadJar(jarFile: File, jarStoreService: JarStoreService): AppJar = { + val remotePath = jarStoreService.copyFromLocal(jarFile) + AppJar(jarFile.getName, remotePath) + } + + /** + * This util can be used to filter out configuration from specific origin + * + * For example, if you want to filter out configuration from reference.conf + * Then you can use like this: + * + * filterOutOrigin(config, "reference.conf") + */ + import scala.collection.JavaConverters._ + def filterOutOrigin(config: Config, originFile: String): Config = { + config.entrySet().asScala.foldLeft(ConfigFactory.empty()) { (config, entry) => + val key = entry.getKey + val value = entry.getValue + val origin = value.origin() + if (origin.resource() == originFile) { + config + } else { + config.withValue(key, value) + } + } + } + + case class JvmSetting(vmargs: Array[String], classPath: Array[String]) + + case class AppJvmSettings(appMater: JvmSetting, executor: JvmSetting) + + /** Get an effective AppJvmSettings from Config */ + def resolveJvmSetting(conf: Config): AppJvmSettings = { + + import org.apache.gearpump.util.Constants._ + + val appMasterVMArgs = Try(conf.getString(GEARPUMP_APPMASTER_ARGS).split("\\s+") + .filter(_.nonEmpty)).toOption + val executorVMArgs = Try(conf.getString(GEARPUMP_EXECUTOR_ARGS).split("\\s+") + .filter(_.nonEmpty)).toOption + + val appMasterClassPath = Try( + conf.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH) + .split("[;:]").filter(_.nonEmpty)).toOption + + val executorClassPath = Try( + conf.getString(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH) + .split(File.pathSeparator).filter(_.nonEmpty)).toOption + + AppJvmSettings( + JvmSetting(appMasterVMArgs.getOrElse(Array.empty[String]), + appMasterClassPath.getOrElse(Array.empty[String])), + JvmSetting(executorVMArgs + .getOrElse(Array.empty[String]), executorClassPath.getOrElse(Array.empty[String]))) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 0faadd9..fb5f594 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -80,7 +80,7 @@ log4j.appender.RFA.MaxBackupIndex=30 # # Hadoop Filesystem Rolling File Appender, similar to RFA but writing to Hadoop Filesystem instead of local disk. # -log4j.appender.HDFSRFA=io.gearpump.util.HadoopFSLogAppender +log4j.appender.HDFSRFA=org.apache.gearpump.util.HadoopFSLogAppender log4j.appender.HDFSRFA.File=${gearpump.log.dir}/${gearpump.log.file} # Logfile size and and 30 backups @@ -103,7 +103,7 @@ log4j.appender.console.layout.ConversionPattern=[%p] [%d{MM/dd/yyyy HH:mm:ss.SSS # # Application Log Appender # -log4j.appender.ALA=io.gearpump.util.HadoopFSLogAppender +log4j.appender.ALA=org.apache.gearpump.util.HadoopFSLogAppender log4j.appender.ALA.File=${gearpump.application.log.dir}/${gearpump.application.log.file} # Logfile size and and 30 backups http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/resources/test.conf ---------------------------------------------------------------------- diff --git a/core/src/test/resources/test.conf b/core/src/test/resources/test.conf index ac18c88..58e5ef1 100644 --- a/core/src/test/resources/test.conf +++ b/core/src/test/resources/test.conf @@ -14,7 +14,7 @@ gearpump { application.executor-num = 1 - worker.executor-process-launcher = "io.gearpump.cluster.worker.DefaultExecutorProcessLauncher" + worker.executor-process-launcher = "org.apache.gearpump.cluster.worker.DefaultExecutorProcessLauncher" cluster { masters = [] @@ -22,9 +22,9 @@ gearpump { streaming.register-task-timeout-ms = 5000 - transport.serializer = "io.gearpump.transport.MockTransportSerializer" + transport.serializer = "org.apache.gearpump.transport.MockTransportSerializer" - serialization-framework = "io.gearpump.serializer.FastKryoSerializationFramework" + serialization-framework = "org.apache.gearpump.serializer.FastKryoSerializationFramework" } ## Configurations only visible on Linux or Mac.. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/TestProbeUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/TestProbeUtil.scala b/core/src/test/scala/io/gearpump/TestProbeUtil.scala deleted file mode 100644 index e7181db..0000000 --- a/core/src/test/scala/io/gearpump/TestProbeUtil.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump - -import scala.language.implicitConversions - -import akka.actor.{Actor, Props, Terminated} -import akka.testkit.TestProbe - -object TestProbeUtil { - implicit def toProps(probe: TestProbe): Props = { - Props(new Actor { - val probeRef = probe.ref - context.watch(probeRef) - def receive: Receive = { - case Terminated(probeRef) => context.stop(self) - case x => probeRef.forward(x) - } - }) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/MasterHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/MasterHarness.scala b/core/src/test/scala/io/gearpump/cluster/MasterHarness.scala deleted file mode 100644 index b1ca357..0000000 --- a/core/src/test/scala/io/gearpump/cluster/MasterHarness.scala +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster - -import java.io.File -import java.net.{InetSocketAddress, Socket, SocketTimeoutException, URLClassLoader, UnknownHostException} -import java.util.Properties -import java.util.concurrent.{Executors, TimeUnit} -import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContext} - -import akka.actor.{Actor, ActorSystem, Address, Props} -import akka.testkit.TestProbe -import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigValueFactory} - -import io.gearpump.cluster.MasterHarness.MockMaster -import io.gearpump.util.Constants._ -import io.gearpump.util.{ActorUtil, FileUtils, LogUtil} - -trait MasterHarness { - private val LOG = LogUtil.getLogger(getClass) - - implicit val pool = MasterHarness.cachedPool - - private var system: ActorSystem = null - private var systemAddress: Address = null - private var host: String = null - private var port: Int = 0 - private var masterProperties = new Properties() - val PROCESS_BOOT_TIME = Duration(25, TimeUnit.SECONDS) - - def getActorSystem: ActorSystem = system - def getHost: String = host - def getPort: Int = port - - protected def config: Config - - def startActorSystem(): Unit = { - val systemConfig = config - system = ActorSystem(MASTER, systemConfig) - systemAddress = ActorUtil.getSystemAddress(system) - host = systemAddress.host.get - port = systemAddress.port.get - - masterProperties.put(s"${GEARPUMP_CLUSTER_MASTERS}.0", s"$getHost:$getPort") - masterProperties.put(s"${GEARPUMP_HOSTNAME}", s"$getHost") - - LOG.info(s"Actor system is started, $host, $port") - } - - def shutdownActorSystem(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - LOG.info(s"Actor system is stopped, $host, $port") - } - - def convertTestConf(host: String, port: Int): File = { - val test = ConfigFactory.parseResourcesAnySyntax("test.conf", - ConfigParseOptions.defaults.setAllowMissing(true)) - - val newConf = test.withValue(GEARPUMP_CLUSTER_MASTERS, - ConfigValueFactory.fromAnyRef(Array(s"$host:$port").toList.asJava)) - - val confFile = File.createTempFile("main", ".conf") - val serialized = newConf.root().render() - FileUtils.write(confFile, serialized) - confFile - } - - def createMockMaster(): TestProbe = { - val masterReceiver = TestProbe()(system) - val master = system.actorOf(Props(classOf[MockMaster], masterReceiver), MASTER) - masterReceiver - } - - def isPortUsed(host: String, port: Int): Boolean = { - - var isPortUsed = true - val socket = new Socket() - try { - socket.setReuseAddress(true) - socket.connect(new InetSocketAddress(host, port), 1000) - socket.isConnected - } catch { - case ex: SocketTimeoutException => - isPortUsed = false - case ex: UnknownHostException => - isPortUsed = false - case ex: Throwable => - // For other case, we think the port has been occupied. - isPortUsed = true - } finally { - socket.close() - } - isPortUsed - } - - def getContextClassPath: Array[String] = { - val contextLoader = Thread.currentThread().getContextClassLoader() - - val urlLoader = if (!contextLoader.isInstanceOf[URLClassLoader]) { - contextLoader.getParent.asInstanceOf[URLClassLoader] - } else { - contextLoader.asInstanceOf[URLClassLoader] - } - - val urls = urlLoader.getURLs() - val classPath = urls.map { url => - new File(url.getPath()).toString - } - classPath - } - - /** - * Remove trailing $ - */ - def getMainClassName(mainObj: Any): String = { - mainObj.getClass.getName.dropRight(1) - } - - def getMasterListOption(): Array[String] = { - masterProperties.asScala.toList.map { kv => - s"-D${kv._1}=${kv._2}" - }.toArray - } - - def masterConfig: Config = { - ConfigFactory.parseProperties(masterProperties).withFallback(system.settings.config) - } -} - -object MasterHarness { - - val cachedPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) - - class MockMaster(receiver: TestProbe) extends Actor { - def receive: Receive = { - case msg => { - receiver.ref forward msg - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/TestUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/TestUtil.scala b/core/src/test/scala/io/gearpump/cluster/TestUtil.scala deleted file mode 100644 index dce4902..0000000 --- a/core/src/test/scala/io/gearpump/cluster/TestUtil.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster - -import akka.actor._ - -object TestUtil { - val DEFAULT_CONFIG = ClusterConfig.default("test.conf") - val MASTER_CONFIG = ClusterConfig.master("test.conf") - val UI_CONFIG = ClusterConfig.ui("test.conf") - - class DummyAppMaster(context: AppMasterContext, app: AppDescription) extends ApplicationMaster { - context.masterProxy !(context, app) - - def receive: Receive = null - } - - val dummyApp: AppDescription = - AppDescription("dummy", classOf[DummyAppMaster].getName, UserConfig.empty) -} \ No newline at end of file
