http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala index 8847df2..0996381 100644 --- a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala +++ b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,17 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.gearpump.cluster.master import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig +/** Master status. Synced means all masters are live and synced. */ object MasterStatus { type Type = String val Synced = "synced" val UnSynced = "unsynced" } - case class MasterNode(host: String, port: Int) { def toTuple: (String, Int) = { (host, port) @@ -33,18 +34,18 @@ case class MasterNode(host: String, port: Int) { } /** - * Master information for REST API call + * Master information returned for REST API call */ case class MasterSummary( - leader: MasterNode, - cluster: List[MasterNode], - aliveFor: Long, - logFile: String, - jarStore: String, - masterStatus: MasterStatus.Type, - homeDirectory: String, - activities: List[MasterActivity], - jvmName: String, - historyMetricsConfig: HistoryMetricsConfig = null) + leader: MasterNode, + cluster: List[MasterNode], + aliveFor: Long, + logFile: String, + jarStore: String, + masterStatus: MasterStatus.Type, + homeDirectory: String, + activities: List[MasterActivity], + jvmName: String, + historyMetricsConfig: HistoryMetricsConfig = null) case class MasterActivity(time: Long, event: String) \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala index da17829..b25162e 100644 --- a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala +++ b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,50 +15,66 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.gearpump.cluster.scheduler import akka.actor.ActorRef -import io.gearpump.WorkerId -case class Resource(slots : Int) { - def +(other : Resource): Resource = Resource(slots + other.slots) +import io.gearpump.cluster.worker.WorkerId + +case class Resource(slots: Int) { - def -(other : Resource): Resource = Resource(slots - other.slots) + // scalastyle:off spaces.after.plus + def +(other: Resource): Resource = Resource(slots + other.slots) + // scalastyle:on spaces.after.plus - def >(other : Resource): Boolean = slots > other.slots + def -(other: Resource): Resource = Resource(slots - other.slots) - def >=(other : Resource): Boolean = !(this < other) + def >(other: Resource): Boolean = slots > other.slots - def <(other : Resource): Boolean = slots < other.slots + def >=(other: Resource): Boolean = !(this < other) - def <=(other : Resource): Boolean = !(this > other) + def <(other: Resource): Boolean = slots < other.slots - def equals(other : Resource): Boolean = slots == other.slots + def <=(other: Resource): Boolean = !(this > other) - def isEmpty: Boolean = slots == 0 + def isEmpty: Boolean = { + slots == 0 + } } -object Priority extends Enumeration{ +/** + * Each streaming job can have a priority, the job with higher priority + * will get scheduled resource earlier than those with lower priority. + */ +object Priority extends Enumeration { type Priority = Value val LOW, NORMAL, HIGH = Value } -object Relaxation extends Enumeration{ +/** + * Relaxation.ONEWORKER means only resource (slot) from that worker will be accepted by + * the requestor application job. + */ +object Relaxation extends Enumeration { type Relaxation = Value // Option ONEWORKER allow user to schedule a task on specific worker. val ANY, ONEWORKER, SPECIFICWORKER = Value } -import Relaxation._ -import Priority._ -case class ResourceRequest(resource: Resource, workerId: WorkerId, priority: Priority = NORMAL, relaxation: Relaxation = ANY, executorNum: Int = 1) +import io.gearpump.cluster.scheduler.Priority._ +import io.gearpump.cluster.scheduler.Relaxation._ + +case class ResourceRequest( + resource: Resource, workerId: WorkerId, priority: Priority = NORMAL, + relaxation: Relaxation = ANY, executorNum: Int = 1) -case class ResourceAllocation(resource : Resource, worker : ActorRef, workerId : WorkerId) +case class ResourceAllocation(resource: Resource, worker: ActorRef, workerId: WorkerId) object Resource { - def empty = new Resource(0) + def empty: Resource = new Resource(0) - def min(res1: Resource, res2: Resource) = if (res1.slots < res2.slots) res1 else res2 + def min(res1: Resource, res2: Resource): Resource = if (res1.slots < res2.slots) res1 else res2 } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala b/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala index a7e7dd2..8581467 100644 --- a/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala +++ b/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,37 +15,42 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.gearpump.cluster.worker import com.typesafe.config.Config -import io.gearpump.util.RichProcess + import io.gearpump.cluster.scheduler.Resource +import io.gearpump.util.RichProcess /** - * ExecutorProcessLauncher is used to launch a process for Executor using given parameters. - * User can implement this interface to decide the behavior of launching a process. - * Set "gearpump.worker.executor-process-launcher" to your implemented class name. - */ + * ExecutorProcessLauncher is used to launch a process for Executor using given parameters. + * + * User can implement this interface to decide the behavior of launching a process. + * Set "gearpump.worker.executor-process-launcher" to your implemented class name. + */ trait ExecutorProcessLauncher { val config: Config /** - * This function will launch a process for Executor using given parameters. - * @param appId The appId of the executor to be launched - * @param executorId The executorId of the executor to be launched - * @param resource The resource allocated for that executor - * @param options The command options - * @param classPath The classpath of the process - * @param mainClass The main class of the process - * @param arguments The rest arguments - */ - def createProcess(appId: Int, executorId:Int, resource: Resource, config: Config, options : Array[String], - classPath : Array[String], mainClass : String, arguments : Array[String]): RichProcess + * This function launches a process for Executor using given parameters. + * + * @param appId The appId of the executor to be launched + * @param executorId The executorId of the executor to be launched + * @param resource The resource allocated for that executor + * @param options The command options + * @param classPath The classpath of the process + * @param mainClass The main class of the process + * @param arguments The rest arguments + */ + def createProcess( + appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String], + classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess /** - * This function will clean resources for a launched process. - * @param appId The appId of the launched executor - * @param executorId The executorId of launched executor - */ + * This function will clean resources for a launched process. + * @param appId The appId of the launched executor + * @param executorId The executorId of launched executor + */ def cleanProcess(appId: Int, executorId: Int): Unit } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala b/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala new file mode 100644 index 0000000..24c6ad2 --- /dev/null +++ b/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.cluster.worker + +/** + * WorkerId is used to uniquely track a worker machine. + * + * @param sessionId sessionId is assigned by Master node for easy tracking. It is possible that + * sessionId is **NOT** unique, so always use WorkerId for comparison. + * @param registerTime the timestamp when a worker node register itself to master node + */ +case class WorkerId(sessionId: Int, registerTime: Long) + +object WorkerId { + val unspecified: WorkerId = new WorkerId(-1, 0L) + + def render(workerId: WorkerId): String = { + workerId.registerTime + "_" + workerId.sessionId + } + + def parse(str: String): WorkerId = { + val pair = str.split("_") + new WorkerId(pair(1).toInt, pair(0).toLong) + } + + implicit val workerIdOrdering: Ordering[WorkerId] = { + new Ordering[WorkerId] { + + /** Compare timestamp first, then id */ + override def compare(x: WorkerId, y: WorkerId): Int = { + if (x.registerTime < y.registerTime) { + -1 + } else if (x.registerTime == y.registerTime) { + if (x.sessionId < y.sessionId) { + -1 + } else if (x.sessionId == y.sessionId) { + 0 + } else { + 1 + } + } else { + 1 + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala index ca700dc..cdf2d03 100644 --- a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala +++ b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,32 +15,33 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.gearpump.cluster.worker -import akka.actor.ActorRef -import io.gearpump.WorkerId +package io.gearpump.cluster.worker import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig /** * Worker summary information for REST API. */ case class WorkerSummary( - workerId: WorkerId, - state: String, - actorPath: String, - aliveFor: Long, - logFile: String, - executors: Array[ExecutorSlots], - totalSlots: Int, - availableSlots: Int, - homeDirectory: String, - jvmName: String, - // Id used to uniquely identity this worker process in low level resource manager like YARN. - resourceManagerContainerId: String, - historyMetricsConfig: HistoryMetricsConfig = null) + workerId: WorkerId, + state: String, + actorPath: String, + aliveFor: Long, + logFile: String, + executors: Array[ExecutorSlots], + totalSlots: Int, + availableSlots: Int, + homeDirectory: String, + jvmName: String, + // Id used to uniquely identity this worker process in low level resource manager like YARN. + resourceManagerContainerId: String, + historyMetricsConfig: HistoryMetricsConfig = null) -object WorkerSummary{ - def empty = WorkerSummary(WorkerId.unspecified, "", "", 0L, "", Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "") +object WorkerSummary { + def empty: WorkerSummary = { + WorkerSummary(WorkerId.unspecified, "", "", 0L, "", + Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "") + } } case class ExecutorSlots(appId: Int, executorId: Int, slots: Int) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala b/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala index ba01c94..54d5431 100644 --- a/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala +++ b/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,17 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.gearpump.jarstore import java.io.File import java.net.URI import java.util.ServiceLoader +import scala.collection.JavaConverters._ -import akka.actor.{ActorSystem, ActorRefFactory} +import akka.actor.ActorSystem import com.typesafe.config.Config -import io.gearpump.util.{Constants, Util} -import scala.collection.JavaConverters._ +import io.gearpump.util.{Constants, Util} case class FilePath(path: String) @@ -39,7 +40,7 @@ trait JarStoreService { * Like "hdfs" for HDFS file system, and "file" for a local * file system. */ - val scheme : String + val scheme: String /** * Init the Jar Store. @@ -47,13 +48,14 @@ trait JarStoreService { def init(config: Config, system: ActorSystem) /** - * This function will copy the local file to the remote JarStore, called from client side. + * This function will copy the local file to the remote JarStore, called from client side. * @param localFile The local file */ def copyFromLocal(localFile: File): FilePath /** - * This function will copy the remote file to local file system, called from client side. + * This function will copy the remote file to local file system, called from client side. + * * @param localFile The destination of file path * @param remotePath The remote file path from JarStore */ @@ -64,7 +66,8 @@ object JarStoreService { /** * Get a active JarStoreService by specifying a scheme. - * Please see config [[Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH]] for more + * + * Please see config [[io.gearpump.util.Constants#GEARPUMP_APP_JAR_STORE_ROOT_PATH]] for more * information. */ def get(config: Config): JarStoreService = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala b/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala index 269f8f3..3a581fb 100644 --- a/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala +++ b/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,15 +18,14 @@ package io.gearpump.metrics +import scala.collection.JavaConverters._ + import akka.actor.{ActorRef, ActorSystem} -import io.gearpump.metrics.Metrics.{Histogram => HistogramData, Meter => MeterData, Counter => CounterData, Gauge => GaugeData} -import io.gearpump.codahale.metrics.MetricRegistry -import akka.actor.ActorSystem -import io.gearpump.codahale.metrics.{Gauge => CodaGauge} +import io.gearpump.codahale.metrics.{Gauge => CodaGauge, MetricRegistry} +import io.gearpump.metrics.Metrics.{Counter => CounterData, Gauge => GaugeData, Histogram => HistogramData, Meter => MeterData} import io.gearpump.metrics.MetricsReporterService.ReportTo import io.gearpump.util.LogUtil -import scala.collection.JavaConverters._ /** * A reporter class for logging metrics values to a remote actor periodically @@ -34,7 +33,7 @@ import scala.collection.JavaConverters._ class AkkaReporter( system: ActorSystem, registry: MetricRegistry) - extends ReportTo{ + extends ReportTo { private val LOG = LogUtil.getLogger(getClass) LOG.info("Start Metrics AkkaReporter") @@ -57,7 +56,7 @@ class AkkaReporter( s.get95thPercentile, s.get99thPercentile, s.get999thPercentile) } - meters.entrySet().asScala.foreach{pair => + meters.entrySet().asScala.foreach { pair => val key = pair.getKey val value = pair.getValue to ! MeterData(key, @@ -67,7 +66,7 @@ class AkkaReporter( getRateUnit) } - gauges.entrySet().asScala.foreach {kv => + gauges.entrySet().asScala.foreach { kv => val value = kv.getValue.asInstanceOf[CodaGauge[Number]].getValue.longValue() to ! GaugeData(kv.getKey, value) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/Counter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/metrics/Counter.scala b/core/src/main/scala/io/gearpump/metrics/Counter.scala index e0c9c57..70c7bae 100644 --- a/core/src/main/scala/io/gearpump/metrics/Counter.scala +++ b/core/src/main/scala/io/gearpump/metrics/Counter.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,9 +21,9 @@ package io.gearpump.metrics import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter} /** - * sampleRate: take a data point for every sampleRate... + * @see io.gearpump.codahale.metrics.Counter */ -class Counter(val name : String, counter : CodaHaleCounter, sampleRate : Int = 1) { +class Counter(val name: String, counter: CodaHaleCounter, sampleRate: Int = 1) { private var sampleCount = 0L private var toBeIncremented = 0L http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/Histogram.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/metrics/Histogram.scala b/core/src/main/scala/io/gearpump/metrics/Histogram.scala index 2ad06c3..4673050 100644 --- a/core/src/main/scala/io/gearpump/metrics/Histogram.scala +++ b/core/src/main/scala/io/gearpump/metrics/Histogram.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,23 +21,23 @@ package io.gearpump.metrics import io.gearpump.codahale.metrics.{Histogram => CodaHaleHistogram} /** - * sampleRate: take a data point for every sampleRate... + * @see io.gearpump.codahale.metrics.Histogram */ -class Histogram(val name : String, hisgram : CodaHaleHistogram, sampleRate : Int = 1) { +class Histogram(val name: String, histogram: CodaHaleHistogram, sampleRate: Int = 1) { private var sampleCount = 0L def update(value: Long) { sampleCount += 1 - if (null != hisgram && sampleCount % sampleRate == 0) { - hisgram.update(value) + if (null != histogram && sampleCount % sampleRate == 0) { + histogram.update(value) } } - def getMean() : Double = { - hisgram.getSnapshot.getMean + def getMean(): Double = { + histogram.getSnapshot.getMean } - def getStdDev() : Double = { - hisgram.getSnapshot.getStdDev + def getStdDev(): Double = { + histogram.getSnapshot.getStdDev } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala b/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala index 606a033..28d420a 100644 --- a/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala +++ b/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala @@ -1,11 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.gearpump.metrics import java.util - -import io.gearpump.codahale.metrics.jvm.{ThreadStatesGaugeSet, MemoryUsageGaugeSet} -import io.gearpump.codahale.metrics.{Gauge, Metric, MetricSet} import scala.collection.JavaConverters._ +import io.gearpump.codahale.metrics.jvm.{MemoryUsageGaugeSet, ThreadStatesGaugeSet} +import io.gearpump.codahale.metrics.{Metric, MetricSet} + class JvmMetricsSet(name: String) extends MetricSet { override def getMetrics: util.Map[String, Metric] = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/Meter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/metrics/Meter.scala b/core/src/main/scala/io/gearpump/metrics/Meter.scala index 660029c..ca79a37 100644 --- a/core/src/main/scala/io/gearpump/metrics/Meter.scala +++ b/core/src/main/scala/io/gearpump/metrics/Meter.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,7 +20,8 @@ package io.gearpump.metrics import io.gearpump.codahale.metrics.{Meter => CodaHaleMeter} -class Meter(val name : String, meter : CodaHaleMeter, sampleRate : Int = 1) { +/** See io.gearpump.codahale.metrics.Meter */ +class Meter(val name: String, meter: CodaHaleMeter, sampleRate: Int = 1) { private var sampleCount = 0L private var toBeMarked = 0L @@ -37,7 +38,7 @@ class Meter(val name : String, meter : CodaHaleMeter, sampleRate : Int = 1) { } } - def getOneMinuteRate() : Double = { + def getOneMinuteRate(): Double = { meter.getOneMinuteRate } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/Metrics.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/metrics/Metrics.scala b/core/src/main/scala/io/gearpump/metrics/Metrics.scala index ba9a59f..aad1af0 100644 --- a/core/src/main/scala/io/gearpump/metrics/Metrics.scala +++ b/core/src/main/scala/io/gearpump/metrics/Metrics.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,39 +18,40 @@ package io.gearpump.metrics +import scala.collection.JavaConverters._ import akka.actor._ +import org.slf4j.Logger + import io.gearpump.codahale.metrics._ import io.gearpump.metrics import io.gearpump.util.LogUtil -import org.slf4j.Logger - -import scala.collection.JavaConverters._ +/** Metric objects registry */ class Metrics(sampleRate: Int) extends Extension { val registry = new MetricRegistry() - def meter(name : String) = { + def meter(name: String): metrics.Meter = { new metrics.Meter(name, registry.meter(name), sampleRate) } - def histogram(name : String) = { + def histogram(name: String): Histogram = { new Histogram(name, registry.histogram(name), sampleRate) } - def histogram(name : String, sampleRate: Int) = { + def histogram(name: String, sampleRate: Int): Histogram = { new Histogram(name, registry.histogram(name), sampleRate) } - def counter(name : String) = { + def counter(name: String): Counter = { new Counter(name, registry.counter(name), sampleRate) } def register(set: MetricSet): Unit = { val names = registry.getNames - val metrics = set.getMetrics.asScala.filterKeys {key => !names.contains(key)} - metrics.foreach{kv => + val metrics = set.getMetrics.asScala.filterKeys { key => !names.contains(key) } + metrics.foreach { kv => registry.register(kv._1, kv._2) } } @@ -88,10 +89,10 @@ object Metrics extends ExtensionId[Metrics] with ExtensionIdProvider { } } - case class Histogram - (name: String, mean: Double, - stddev: Double, median: Double, - p95: Double, p99: Double, p999: Double) + case class Histogram ( + name: String, mean: Double, + stddev: Double, median: Double, + p95: Double, p99: Double, p999: Double) extends MetricType case class Counter(name: String, value: Long) extends MetricType @@ -118,7 +119,7 @@ object Metrics extends ExtensionId[Metrics] with ExtensionIdProvider { override def get(system: ActorSystem): Metrics = super.get(system) - override def lookup = Metrics + override def lookup: ExtensionId[Metrics] = Metrics override def createExtension(system: ExtendedActorSystem): Metrics = { val metricsEnabled = system.settings.config.getBoolean(GEARPUMP_METRIC_ENABLED) @@ -133,27 +134,27 @@ object Metrics extends ExtensionId[Metrics] with ExtensionIdProvider { } class DummyMetrics extends Metrics(1) { - override def register(set: MetricSet) = Unit + override def register(set: MetricSet): Unit = Unit private val meter = new metrics.Meter("", null) { - override def mark() = Unit - override def mark(n: Long) = Unit + override def mark(): Unit = Unit + override def mark(n: Long): Unit = Unit override def getOneMinuteRate(): Double = 0 } private val histogram = new metrics.Histogram("", null) { - override def update(value: Long) = Unit - override def getMean() : Double = 0 - override def getStdDev() : Double = 0 + override def update(value: Long): Unit = Unit + override def getMean(): Double = 0 + override def getStdDev(): Double = 0 } private val counter = new metrics.Counter("", null) { - override def inc() = Unit - override def inc(n: Long) = Unit + override def inc(): Unit = Unit + override def inc(n: Long): Unit = Unit } - override def meter(name : String) = meter - override def histogram(name : String) = histogram - override def counter(name : String) = counter + override def meter(name: String): metrics.Meter = meter + override def histogram(name: String): metrics.Histogram = histogram + override def counter(name: String): metrics.Counter = counter } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala b/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala index 6c4c34f..f52a060 100644 --- a/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala +++ b/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,18 +18,15 @@ package io.gearpump.metrics -import java.util - import io.gearpump.cluster.MasterToClient.HistoryMetricsItem /** - * Will aggregate a full set of metrics into a smaller set + * Aggregates a larger set of metrics into a smaller set * * Sub Class must implement a constructor with signature like this: - * MetricsAggregator(config: Config) - * - * + * MetricsAggregator(config: Config) */ trait MetricsAggregator { - def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]): List[HistoryMetricsItem] + def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]) + : List[HistoryMetricsItem] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala b/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala index c5be041..05decdd 100644 --- a/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala +++ b/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,16 +20,22 @@ package io.gearpump.metrics import java.net.InetSocketAddress import java.util.concurrent.TimeUnit +import scala.concurrent.duration._ + +import akka.actor.{Actor, ActorRef} -import akka.actor.{ActorRef, ExtendedActorSystem, Actor} -import io.gearpump.codahale.metrics.{Slf4jReporter, MetricFilter, ScheduledReporter} -import io.gearpump.codahale.metrics.graphite.{GraphiteReporter, Graphite} -import io.gearpump.metrics.Metrics.{ReportMetrics, DemandMoreMetrics} -import io.gearpump.metrics.MetricsReporterService.{ReportTo} +import io.gearpump.codahale.metrics.graphite.{Graphite, GraphiteReporter} +import io.gearpump.codahale.metrics.{MetricFilter, Slf4jReporter} +import io.gearpump.metrics.Metrics.{DemandMoreMetrics, ReportMetrics} +import io.gearpump.metrics.MetricsReporterService.ReportTo import io.gearpump.util.Constants._ import io.gearpump.util.LogUtil -import scala.concurrent.duration._ +/** + * Reports the metrics data to some where, like Ganglia, remote Akka actor, log files... + * + * @param metrics Holds a list of metrics object. + */ class MetricsReporterService(metrics: Metrics) extends Actor { private val LOG = LogUtil.getLogger(getClass) @@ -40,14 +46,15 @@ class MetricsReporterService(metrics: Metrics) extends Actor { implicit val dispatcher = context.dispatcher def receive: Receive = { + // The subscriber is demanding more messages. case DemandMoreMetrics(subscriber) => { reporter.report(subscriber) - context.system.scheduler.scheduleOnce(reportInterval milliseconds, + context.system.scheduler.scheduleOnce(reportInterval.milliseconds, subscriber, ReportMetrics) } } - def startGraphiteReporter = { + def startGraphiteReporter(): ReportTo = { val graphiteHost = system.settings.config.getString(GEARPUMP_METRIC_GRAPHITE_HOST) val graphitePort = system.settings.config.getInt(GEARPUMP_METRIC_GRAPHITE_PORT) @@ -64,7 +71,7 @@ class MetricsReporterService(metrics: Metrics) extends Actor { } } - def startSlf4jReporter = { + def startSlf4jReporter(): ReportTo = { new ReportTo { val reporter = Slf4jReporter.forRegistry(metrics.registry) .convertRatesTo(TimeUnit.SECONDS) @@ -77,7 +84,7 @@ class MetricsReporterService(metrics: Metrics) extends Actor { } } - def startAkkaReporter = { + def startAkkaReporter(): ReportTo = { new AkkaReporter(system, metrics.registry) } @@ -85,16 +92,18 @@ class MetricsReporterService(metrics: Metrics) extends Actor { val reporterType = system.settings.config.getString(GEARPUMP_METRIC_REPORTER) LOG.info(s"Metrics reporter is enabled, using $reporterType reporter") val reporter = reporterType match { - case "graphite" => startGraphiteReporter - case "logfile" => startSlf4jReporter - case "akka" => startAkkaReporter + case "graphite" => startGraphiteReporter() + case "logfile" => startSlf4jReporter() + case "akka" => startAkkaReporter() } reporter } } object MetricsReporterService { - trait ReportTo{ + + /** Target where user want to report the metrics data to */ + trait ReportTo { def report(to: ActorRef): Unit } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/package.scala b/core/src/main/scala/io/gearpump/package.scala index 1ed94a7..1877651 100644 --- a/core/src/main/scala/io/gearpump/package.scala +++ b/core/src/main/scala/io/gearpump/package.scala @@ -1,50 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io package object gearpump { type TimeStamp = Long val LatestTime = -1 - - /** - * WorkerId is used to uniquely track a worker machine. - * - * @param sessionId sessionId is assigned by Master node for easy tracking. It is possible that - * sessionId is **NOT** unique, so always use WorkerId for comparison. - * @param registerTime the timestamp when a worker node register itself to master node - */ - case class WorkerId(sessionId: Int, registerTime: Long) - - object WorkerId { - val unspecified: WorkerId = new WorkerId(-1, 0L) - - def render(workerId: WorkerId): String = { - workerId.registerTime + "_" + workerId.sessionId - } - - def parse(str: String): WorkerId = { - val pair = str.split("_") - new WorkerId(pair(1).toInt, pair(0).toLong) - } - - implicit val workerIdOrdering: Ordering[WorkerId] = { - new Ordering[WorkerId] { - - /** Compare timestamp first, then id */ - override def compare(x: WorkerId, y: WorkerId): Int = { - if (x.registerTime < y.registerTime) { - -1 - } else if (x.registerTime == y.registerTime) { - if (x.sessionId < y.sessionId) { - -1 - } else if (x.sessionId == y.sessionId) { - 0 - } else { - 1 - } - } else { - 1 - } - } - } - } - } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala index dba02ee..0b9c57e 100644 --- a/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala +++ b/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,11 +20,13 @@ package io.gearpump.partitioner import io.gearpump.Message +/** Used by storm module to broadcast message to all downstream tasks */ class BroadcastPartitioner extends MulticastPartitioner { private var lastPartitionNum = -1 private var partitions = Array.empty[Int] - override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = { + override def getPartitions( + msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = { if (partitionNum != lastPartitionNum) { partitions = (0 until partitionNum).toArray lastPartitionNum = partitionNum http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala index 3ed6dd4..062fc10 100644 --- a/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala +++ b/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.gearpump.partitioner import io.gearpump.Message @@ -7,7 +25,7 @@ import io.gearpump.Message * And each task in current processor will co-locate with task of last processor */ class CoLocationPartitioner extends UnicastPartitioner { - override def getPartition(msg : Message, partitionNum : Int, currentPartitionId: Int) : Int = { + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { currentPartitionId } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala index 5e4c7bc..6ba0cd6 100644 --- a/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala +++ b/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -26,7 +26,7 @@ import io.gearpump.Message * same hash code after serialization and deserialization. */ class HashPartitioner extends UnicastPartitioner { - override def getPartition(msg : Message, partitionNum : Int, currentPartitionId: Int) : Int = { + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala b/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala index 6285bb7..69104c7 100644 --- a/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala +++ b/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,22 +18,34 @@ package io.gearpump.partitioner -import io.gearpump.Message import scala.reflect.ClassTag + import org.apache.commons.lang.SerializationUtils +import io.gearpump.Message + +/** + * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner decide how ONE task + * of upstream processor A send to several tasks of downstream processor B. + */ sealed trait Partitioner extends Serializable +/** + * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), UnicastPartitioner does + * ONE-task {@literal ->} ONE-task mapping. + */ trait UnicastPartitioner extends Partitioner { - /** + + /** + * Gets the SINGLE downstream processor task index to send message to. * - * @param msg - * @param partitionNum - * @param currentPartitionId, used when the downstream processor want to share the same - * partition id, - * @return + * @param msg Message you want to send + * @param partitionNum How many tasks does the downstream processor have. + * @param upstreamTaskIndex Upstream task's task index who trigger the getPartition() call. + * + * @return ONE task index of downstream processor. */ - def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int + def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Int def getPartition(msg: Message, partitionNum: Int): Int = { getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID) @@ -41,14 +53,20 @@ trait UnicastPartitioner extends Partitioner { } trait MulticastPartitioner extends Partitioner { - def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] + + /** + * Gets a list of downstream processor task indexes to send message to. + * + * @param upstreamTaskIndex Current sender task's task index. + * + */ + def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Array[Int] def getPartitions(msg: Message, partitionNum: Int): Array[Int] = { getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID) } } - sealed trait PartitionerFactory { def name: String @@ -56,26 +74,32 @@ sealed trait PartitionerFactory { def partitioner: Partitioner } -class PartitionerObject(private [this] val _partitioner: Partitioner) extends PartitionerFactory with Serializable { +/** Stores the Partitioner in an object. To use it, user need to deserialize the object */ +class PartitionerObject(private[this] val _partitioner: Partitioner) + extends PartitionerFactory with Serializable { override def name: String = partitioner.getClass.getName - override def partitioner: Partitioner = SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner] + override def partitioner: Partitioner = { + SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner] + } } -class PartitionerByClassName(partitionerClass: String) extends PartitionerFactory with Serializable { - override def name: String = partitionerClass +/** Store the partitioner in class Name, the user need to instantiate a new class */ +class PartitionerByClassName(partitionerClass: String) + extends PartitionerFactory with Serializable { - override def partitioner: Partitioner = Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner] + override def name: String = partitionerClass + override def partitioner: Partitioner = { + Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner] + } } - /** - * @param partitionerFactory + * @param partitionerFactory How we construct a Partitioner. */ case class PartitionerDescription(partitionerFactory: PartitionerFactory) - object Partitioner { val UNKNOWN_PARTITION_ID = -1 http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala index 035c9d2..ff962fa 100644 --- a/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala +++ b/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala @@ -15,11 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.gearpump.partitioner -import io.gearpump.Message import scala.util.Random +import io.gearpump.Message + /** * The idea of ShuffleGroupingPartitioner is derived from Storm. * Messages are randomly distributed across the downstream's tasks in a way such that http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala index d0d3c39..6b3c26e 100644 --- a/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala +++ b/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -23,17 +23,16 @@ import java.util.Random import io.gearpump.Message /** - * Round Robin partition the data. + * Round Robin partition the data to downstream processor tasks. */ class ShufflePartitioner extends UnicastPartitioner { private var seed = 0 private var count = 0 - - override def getPartition(msg : Message, partitionNum : Int, currentPartitionId: Int) : Int = { + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { if (seed == 0) { - seed = newSeed + seed = newSeed() } val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum @@ -41,5 +40,5 @@ class ShufflePartitioner extends UnicastPartitioner { result } - def newSeed = new Random().nextInt() + private def newSeed(): Int = new Random().nextInt() } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/security/Authenticator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/security/Authenticator.scala b/core/src/main/scala/io/gearpump/security/Authenticator.scala index afb13a9..73bc8e1 100644 --- a/core/src/main/scala/io/gearpump/security/Authenticator.scala +++ b/core/src/main/scala/io/gearpump/security/Authenticator.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,23 +17,21 @@ */ package io.gearpump.security -import io.gearpump.security.Authenticator.AuthenticationResult - import scala.concurrent.{ExecutionContext, Future} +import io.gearpump.security.Authenticator.AuthenticationResult /** * Authenticator for UI dashboard. * * Sub Class must implement a constructor with signature like this: - * this(config: Config) - * + * this(config: Config) */ trait Authenticator { - // TODO: Change the signature to return more attributes of user - // credentials... - def authenticate(user: String, password: String, ec: ExecutionContext): Future[AuthenticationResult] + // TODO: Change the signature to return more attributes of user credentials... + def authenticate( + user: String, password: String, ec: ExecutionContext): Future[AuthenticationResult] } object Authenticator { @@ -45,22 +43,25 @@ object Authenticator { def permissionLevel: Int } - val UnAuthenticated = new AuthenticationResult{ + val UnAuthenticated = new AuthenticationResult { override val authenticated = false override val permissionLevel = -1 } - val Guest = new AuthenticationResult{ + /** Guest can view but have no permission to submit app or write */ + val Guest = new AuthenticationResult { override val authenticated = true override val permissionLevel = 1000 } - val User = new AuthenticationResult{ + /** User can submit app, kill app, but have no permission to add or remote machines */ + val User = new AuthenticationResult { override val authenticated = true override val permissionLevel = 1000 + Guest.permissionLevel } - val Admin = new AuthenticationResult{ + /** Super user */ + val Admin = new AuthenticationResult { override val authenticated = true override val permissionLevel = 1000 + User.permissionLevel } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala b/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala index 3ecfd88..0743a3f 100644 --- a/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala +++ b/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,10 +18,12 @@ package io.gearpump.security +import scala.concurrent.{ExecutionContext, Future} + +import com.typesafe.config.Config + import io.gearpump.security.Authenticator.AuthenticationResult import io.gearpump.security.ConfigFileBasedAuthenticator._ -import com.typesafe.config.Config -import scala.concurrent.{ExecutionContext, Future} object ConfigFileBasedAuthenticator { @@ -30,7 +32,9 @@ object ConfigFileBasedAuthenticator { private val USERS = ROOT + "." + "users" private val GUESTS = ROOT + "." + "guests" - private case class Credentials(admins: Map[String, String], users: Map[String, String], guests: Map[String, String]) { + private case class Credentials( + admins: Map[String, String], users: Map[String, String], guests: Map[String, String]) { + def verify(user: String, password: String): AuthenticationResult = { if (admins.contains(user)) { if (verify(user, password, admins)) { @@ -70,26 +74,32 @@ object ConfigFileBasedAuthenticator { * users have limited permission to submit an application and etc.. * guests can not submit/kill applications, but can view the application status. * - * see conf/gear.conf section gearpump.ui-security.config-file-based-authenticator to find information - * about how to configure this authenticator. + * see conf/gear.conf section gearpump.ui-security.config-file-based-authenticator to find + * information about how to configure this authenticator. * * [Security consideration] - * It will keep one-way sha1 digest of password instead of password itself. The original password is NOT - * kept in any way, so generally it is safe. + * It will keep one-way sha1 digest of password instead of password itself. The original password is + * NOT kept in any way, so generally it is safe. * - * digesting flow (from original password to digest): - * random salt byte array of length 8 -> byte array of (salt + sha1(salt, password)) -> base64Encode * - * verification user input password with stored digest: - * base64Decode -> extract salt -> do sha1(salt, password) -> generate digest: salt + sha1 -> - * compare the generated digest with the stored digest. + * digesting flow (from original password to digest): + * {{{ + * random salt byte array of length 8 -> byte array of (salt + sha1(salt, password)) -> + * base64Encode. + * }}} * + * Verification user input password with stored digest: + * {{{ + * base64Decode -> extract salt -> do sha1(salt, password) -> generate digest: + * salt + sha1 -> compare the generated digest with the stored digest. + * }}} */ class ConfigFileBasedAuthenticator(config: Config) extends Authenticator { private val credentials = loadCredentials(config) - override def authenticate(user: String, password: String, ec: ExecutionContext): Future[AuthenticationResult] = { + override def authenticate(user: String, password: String, ec: ExecutionContext) + : Future[AuthenticationResult] = { implicit val ctx = ec Future { credentials.verify(user, password) @@ -97,13 +107,13 @@ class ConfigFileBasedAuthenticator(config: Config) extends Authenticator { } private def loadCredentials(config: Config): Credentials = { - val admins = configToMap(config, ADMINS) - val users = configToMap(config, USERS) + val admins = configToMap(config, ADMINS) + val users = configToMap(config, USERS) val guests = configToMap(config, GUESTS) new Credentials(admins, users, guests) } - private def configToMap(config : Config, path: String) = { + private def configToMap(config: Config, path: String) = { import scala.collection.JavaConverters._ config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/security/PasswordUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/security/PasswordUtil.scala b/core/src/main/scala/io/gearpump/security/PasswordUtil.scala index f8eafdd..9bf40d2 100644 --- a/core/src/main/scala/io/gearpump/security/PasswordUtil.scala +++ b/core/src/main/scala/io/gearpump/security/PasswordUtil.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,9 +19,10 @@ package io.gearpump.security import java.security.MessageDigest -import sun.misc.{BASE64Decoder, BASE64Encoder} import scala.util.Try +import sun.misc.{BASE64Decoder, BASE64Encoder} + /** * Util to verify whether user input password is valid or not. * It use sha1 to do the digesting. @@ -30,9 +31,11 @@ object PasswordUtil { private val SALT_LENGTH = 8 /** - * verification user input password with stored digest: + * Verifies user input password with stored digest: + * {{{ * base64Decode -> extract salt -> do sha1(salt, password) -> * generate digest: salt + sha1 -> compare the generated digest with the stored digest. + * }}} */ def verify(password: String, stored: String): Boolean = { Try { @@ -45,7 +48,10 @@ object PasswordUtil { } /** * digesting flow (from original password to digest): - * random salt byte array of length 8 -> byte array of (salt + sha1(salt, password)) -> base64Encode + * {{{ + * random salt byte array of length 8 -> + * byte array of (salt + sha1(salt, password)) -> base64Encode + * }}} */ def hash(password: String): String = { // Salt generation 64 bits long @@ -66,8 +72,8 @@ object PasswordUtil { } private def base64Encode(data: Array[Byte]): String = { - val endecoder = new BASE64Encoder() - endecoder.encode(data) + val endecoder = new BASE64Encoder() + endecoder.encode(data) } private def base64Decode(data: String): Array[Byte] = { @@ -75,13 +81,14 @@ object PasswordUtil { decoder.decodeBuffer(data) } - private def help = { + // scalastyle:off println + private def help() = { Console.println("usage: gear io.gearpump.security.PasswordUtil -password <your password>") } def main(args: Array[String]): Unit = { if (args.length != 2 || args(0) != "-password") { - help + help() } else { val pass = args(1) val result = hash(pass) @@ -90,4 +97,5 @@ object PasswordUtil { Console.println(result) } } + // scalastyle:on println } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala b/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala index a69ab43..cb9d563 100644 --- a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala +++ b/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,12 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.gearpump.serializer import akka.actor.ExtendedActorSystem + import io.gearpump.cluster.UserConfig -class FastKryoSerializationFramework extends SerializationFramework{ +/** + * A build-in serializer framework using kryo + * + * NOTE: The Kryo here is a shaded version by Gearpump + */ +class FastKryoSerializationFramework extends SerializationFramework { private var system: ExtendedActorSystem = null private lazy val pool = new ThreadLocal[Serializer]() { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala b/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala index 817cd84..57b7b5e 100644 --- a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala +++ b/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,13 +19,14 @@ package io.gearpump.serializer import akka.actor.ExtendedActorSystem + import io.gearpump.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy +import io.gearpump.objenesis.strategy.StdInstantiatorStrategy import io.gearpump.romix.serialization.kryo.KryoSerializerWrapper import io.gearpump.serializer.FastKryoSerializer.KryoSerializationException import io.gearpump.util.LogUtil -import io.gearpump.objenesis.strategy.StdInstantiatorStrategy -class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer{ +class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer { private val LOG = LogUtil.getLogger(getClass) private val config = system.settings.config @@ -37,7 +38,7 @@ class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer{ kryo.setInstantiatorStrategy(strategy) private val kryoClazz = new GearpumpSerialization(config).customize(kryo) - override def serialize(message: Any) : Array[Byte] = { + override def serialize(message: Any): Array[Byte] = { try { kryoSerializer.toBinary(message) } catch { @@ -72,7 +73,7 @@ class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer{ } } - override def deserialize(msg : Array[Byte]): Any = { + override def deserialize(msg: Array[Byte]): Any = { kryoSerializer.fromBinary(msg) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala b/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala index 41ccaa4..a7eb6cf 100644 --- a/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala +++ b/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,16 +18,17 @@ package io.gearpump.serializer -import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer} import com.typesafe.config.Config -import io.gearpump.util.{Constants, LogUtil} import org.slf4j.Logger +import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer} +import io.gearpump.util.{Constants, LogUtil} + class GearpumpSerialization(config: Config) { private val LOG: Logger = LogUtil.getLogger(getClass) - def customize(kryo: Kryo): Unit = { + def customize(kryo: Kryo): Unit = { val serializationMap = configToMap(config, Constants.GEARPUMP_SERIALIZERS) @@ -37,21 +38,22 @@ class GearpumpSerialization(config: Config) { if (value == null || value.isEmpty) { - //Use default serializer for this class type + // Use default serializer for this class type kryo.register(keyClass) } else { val valueClass = Class.forName(value) - val register = kryo.register(keyClass, valueClass.newInstance().asInstanceOf[KryoSerializer[_]]) + val register = kryo.register(keyClass, + valueClass.newInstance().asInstanceOf[KryoSerializer[_]]) LOG.debug(s"Registering ${keyClass}, id: ${register.getId}") } } kryo.setReferences(false) - // require the user to register the class first before using + // Requires the user to register the class first before using kryo.setRegistrationRequired(true) } - private final def configToMap(config : Config, path: String) = { + private final def configToMap(config: Config, path: String) = { import scala.collection.JavaConverters._ config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala b/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala index d466ccf..4947dcc 100644 --- a/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala +++ b/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,15 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.gearpump.serializer import akka.actor.ExtendedActorSystem + import io.gearpump.cluster.UserConfig /** * User are allowed to use a customized serialization framework by extending this * interface. - * */ trait SerializationFramework { def init(system: ExtendedActorSystem, config: UserConfig) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/serializer/Serializer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/serializer/Serializer.scala b/core/src/main/scala/io/gearpump/serializer/Serializer.scala index 02e30db..ff8b147 100644 --- a/core/src/main/scala/io/gearpump/serializer/Serializer.scala +++ b/core/src/main/scala/io/gearpump/serializer/Serializer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,13 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.gearpump.serializer /** * User defined message serializer */ trait Serializer { - def serialize(message: Any) : Array[Byte] + def serialize(message: Any): Array[Byte] - def deserialize(msg : Array[Byte]): Any + def deserialize(msg: Array[Byte]): Any } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/Express.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/transport/Express.scala b/core/src/main/scala/io/gearpump/transport/Express.scala index e8f1b8e..101b841 100644 --- a/core/src/main/scala/io/gearpump/transport/Express.scala +++ b/core/src/main/scala/io/gearpump/transport/Express.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,17 +18,20 @@ package io.gearpump.transport +import scala.collection.immutable.LongMap +import scala.concurrent._ + import akka.actor._ import akka.agent.Agent +import org.slf4j.Logger + import io.gearpump.transport.netty.Client.Close import io.gearpump.transport.netty.{Context, TaskMessage} import io.gearpump.util.LogUtil -import org.slf4j.Logger - -import scala.collection.immutable.LongMap -import scala.concurrent._ trait ActorLookupById { + + /** Lookup actor ref for local task actor by providing a TaskId (TaskId.toLong) */ def lookupLocalActor(id: Long): Option[ActorRef] } @@ -40,8 +43,9 @@ trait ActorLookupById { */ class Express(val system: ExtendedActorSystem) extends Extension with ActorLookupById { - import io.gearpump.transport.Express._ import system.dispatcher + + import io.gearpump.transport.Express._ val localActorMap = Agent(LongMap.empty[ActorRef]) val remoteAddressMap = Agent(Map.empty[Long, HostPort]) @@ -59,15 +63,16 @@ class Express(val system: ExtendedActorSystem) extends Extension with ActorLooku LOG.info(s"binding to netty server $localHost") system.registerOnTermination(new Runnable { - override def run = context.close + override def run(): Unit = context.close() }) (context, serverPort, localHost) } - def unregisterLocalActor(id : Long) : Unit = { + def unregisterLocalActor(id: Long): Unit = { localActorMap.sendOff(_ - id) } + /** Start Netty client actors to connect to remote machines */ def startClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = { val clientsToClose = remoteClientMap.get().filterKeys(!hostPorts.contains(_)).keySet closeClients(clientsToClose) @@ -85,7 +90,7 @@ class Express(val system: ExtendedActorSystem) extends Extension with ActorLooku def closeClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = { remoteClientMap.alter { map => - map.filterKeys(hostPorts.contains).foreach{ hostAndClient => + map.filterKeys(hostPorts.contains).foreach { hostAndClient => val (_, client) = hostAndClient client ! Close } @@ -93,36 +98,38 @@ class Express(val system: ExtendedActorSystem) extends Extension with ActorLooku } } - def registerLocalActor(id : Long, actor: ActorRef): Unit = { + def registerLocalActor(id: Long, actor: ActorRef): Unit = { LOG.info(s"RegisterLocalActor: $id, actor: ${actor.path.name}") init localActorMap.sendOff(_ + (id -> actor)) } - def lookupLocalActor(id: Long) = localActorMap.get().get(id) + def lookupLocalActor(id: Long): Option[ActorRef] = localActorMap.get().get(id) - def lookupRemoteAddress(id : Long) = remoteAddressMap.get().get(id) + def lookupRemoteAddress(id: Long): Option[HostPort] = remoteAddressMap.get().get(id) - //transport to remote address + /** Send message to remote task */ def transport(taskMessage: TaskMessage, remote: HostPort): Unit = { val remoteClient = remoteClientMap.get.get(remote) if (remoteClient.isDefined) { remoteClient.get.tell(taskMessage, Actor.noSender) } else { - val errorMsg = s"Clients has not been launched properly before transporting messages, the destination is $remote" + val errorMsg = s"Clients has not been launched properly before transporting messages, " + + s"the destination is $remote" LOG.error(errorMsg) throw new Exception(errorMsg) } } } +/** A customized transport layer by using Akka extension */ object Express extends ExtensionId[Express] with ExtensionIdProvider { val LOG: Logger = LogUtil.getLogger(getClass) override def get(system: ActorSystem): Express = super.get(system) - override def lookup = Express + override def lookup: ExtensionId[Express] = Express override def createExtension(system: ExtendedActorSystem): Express = new Express(system) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/HostPort.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/transport/HostPort.scala b/core/src/main/scala/io/gearpump/transport/HostPort.scala index 72da203..40c4342 100644 --- a/core/src/main/scala/io/gearpump/transport/HostPort.scala +++ b/core/src/main/scala/io/gearpump/transport/HostPort.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -25,7 +25,7 @@ case class HostPort(host: String, port: Int) { } object HostPort { - def apply(address : String) : HostPort = { + def apply(address: String): HostPort = { val hostAndPort = address.split(":") new HostPort(hostAndPort(0), hostAndPort(1).toInt) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/netty/Client.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/transport/netty/Client.scala b/core/src/main/scala/io/gearpump/transport/netty/Client.scala index 2ebf09d..d5960ad 100644 --- a/core/src/main/scala/io/gearpump/transport/netty/Client.scala +++ b/core/src/main/scala/io/gearpump/transport/netty/Client.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -23,18 +23,22 @@ import java.nio.channels.ClosedChannelException import java.util import java.util.Random import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration +import scala.language.implicitConversions import akka.actor.Actor -import io.gearpump.transport.HostPort -import io.gearpump.util.LogUtil import org.jboss.netty.bootstrap.ClientBootstrap import org.jboss.netty.channel._ import org.slf4j.Logger -import scala.concurrent.duration.FiniteDuration -import scala.language.implicitConversions +import io.gearpump.transport.HostPort +import io.gearpump.util.LogUtil -class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) extends Actor { +/** + * Netty Client implemented as an actor, on the other side, there is a netty server Actor. + * All messages sent to this actor will be forwarded to remote machine. + */ +class Client(conf: NettyConfig, factory: ChannelFactory, hostPort: HostPort) extends Actor { import io.gearpump.transport.netty.Client._ val name = s"netty-client-$hostPort" @@ -42,7 +46,7 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex private final var bootstrap: ClientBootstrap = null private final val random: Random = new Random private val serializer = conf.newTransportSerializer - private var channel : Channel = null + private var channel: Channel = null var batch = new util.ArrayList[TaskMessage] @@ -52,25 +56,26 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex self ! Connect(0) } - def receive = messageHandler orElse connectionHandler + def receive: Receive = messageHandler orElse connectionHandler - def messageHandler : Receive = { + def messageHandler: Receive = { case msg: TaskMessage => batch.add(msg) - case flush @ Flush(flushChannel) => + case flush@Flush(flushChannel) => if (channel != flushChannel) { - Unit //Drop, as it belong to old channel flush message + Unit // Drop, as it belong to old channel flush message } else if (batch.size > 0 && flushChannel.isWritable) { send(flushChannel, batch.iterator) batch.clear() self ! flush } else { import context.dispatcher - context.system.scheduler.scheduleOnce(new FiniteDuration(conf.flushCheckInterval, TimeUnit.MILLISECONDS), self, flush) + context.system.scheduler.scheduleOnce( + new FiniteDuration(conf.flushCheckInterval, TimeUnit.MILLISECONDS), self, flush) } } - def connectionHandler : Receive = { + def connectionHandler: Receive = { case ChannelReady(channel) => this.channel = channel self ! Flush(channel) @@ -90,12 +95,12 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex context.become(closed) } - def closed : Receive = { - case msg : AnyRef => + def closed: Receive = { + case msg: AnyRef => LOG.error(s"This client $name is closed, drop any message ${msg.getClass.getSimpleName}...") } - private def connect(tries: Int) : Unit = { + private def connect(tries: Int): Unit = { LOG.info(s"netty client try to connect to $name, tries: $tries") if (tries <= conf.max_retries) { val remote_addr = new InetSocketAddress(hostPort.host, hostPort.port) @@ -107,7 +112,9 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex LOG.error(s"failed to connect to $name, reason: ${ex.getMessage}, class: ${ex.getClass}") current.close() import context.dispatcher - context.system.scheduler.scheduleOnce(new FiniteDuration(getSleepTimeMs(tries), TimeUnit.MILLISECONDS), self, Connect(tries + 1)) + context.system.scheduler.scheduleOnce( + new FiniteDuration( + getSleepTimeMs(tries), TimeUnit.MILLISECONDS), self, Connect(tries + 1)) } } else { LOG.error(s"fail to connect to a remote host $name after retied $tries ...") @@ -144,7 +151,7 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex batch = null } - override def postStop() = { + override def postStop(): Unit = { close() } @@ -154,9 +161,10 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex if (channel.isOpen) { channel.close } - LOG.error(s"failed to send requests to ${channel.getRemoteAddress} ${ex.getClass.getSimpleName}") + LOG.error(s"failed to send requests " + + s"to ${channel.getRemoteAddress} ${ex.getClass.getSimpleName}") if (!ex.isInstanceOf[ClosedChannelException]) { - LOG.error(ex.getMessage, ex) + LOG.error(ex.getMessage, ex) } self ! CompareAndReconnectIfEqual(channel) } @@ -175,18 +183,17 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex private def isChannelWritable = (null != channel) && channel.isWritable } - object Client { val LOG: Logger = LogUtil.getLogger(getClass) - //Reconnect if current channel equals channel + // Reconnect if current channel equals channel case class CompareAndReconnectIfEqual(channel: Channel) case class Connect(tries: Int) - case class ChannelReady(chanel : Channel) + case class ChannelReady(chanel: Channel) case object Close - case class Flush(channel : Channel) + case class Flush(channel: Channel) class ClientErrorHandler(name: String) extends SimpleChannelUpstreamHandler { @@ -210,7 +217,9 @@ object Client { } } - implicit def channelFutureToChannelFutureOps(channel: ChannelFuture): ChannelFutureOps = new ChannelFutureOps(channel) + implicit def channelFutureToChannelFutureOps(channel: ChannelFuture): ChannelFutureOps = { + new ChannelFutureOps(channel) + } class ChannelFutureOps(channelFuture: ChannelFuture) {
