http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/JvmMetricsSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/metrics/JvmMetricsSet.scala b/core/src/main/scala/org/apache/gearpump/metrics/JvmMetricsSet.scala new file mode 100644 index 0000000..87db442 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/metrics/JvmMetricsSet.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.metrics + +import java.util +import scala.collection.JavaConverters._ + +import io.gearpump.codahale.metrics.jvm.{MemoryUsageGaugeSet, ThreadStatesGaugeSet} +import io.gearpump.codahale.metrics.{Metric, MetricSet} + +class JvmMetricsSet(name: String) extends MetricSet { + + override def getMetrics: util.Map[String, Metric] = { + val memoryMetrics = new MemoryUsageGaugeSet().getMetrics.asScala + val threadMetrics = new ThreadStatesGaugeSet().getMetrics.asScala + Map( + s"$name:memory.total.used" -> memoryMetrics("total.used"), + s"$name:memory.total.committed" -> memoryMetrics("total.committed"), + s"$name:memory.total.max" -> memoryMetrics("total.max"), + s"$name:memory.heap.used" -> memoryMetrics("heap.used"), + s"$name:memory.heap.committed" -> memoryMetrics("heap.committed"), + s"$name:memory.heap.max" -> memoryMetrics("heap.max"), + s"$name:thread.count" -> threadMetrics("count"), + s"$name:thread.daemon.count" -> threadMetrics("daemon.count") + ).asJava + } +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala b/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala new file mode 100644 index 0000000..6d89456 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.metrics + +import io.gearpump.codahale.metrics.{Meter => CodaHaleMeter} + +/** See org.apache.gearpump.codahale.metrics.Meter */ +class Meter(val name: String, meter: CodaHaleMeter, sampleRate: Int = 1) { + private var sampleCount = 0L + private var toBeMarked = 0L + + def mark() { + meter.mark(1) + } + + def mark(n: Long) { + toBeMarked += n + sampleCount += 1 + if (null != meter && sampleCount % sampleRate == 0) { + meter.mark(toBeMarked) + toBeMarked = 0 + } + } + + def getOneMinuteRate(): Double = { + meter.getOneMinuteRate + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala b/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala new file mode 100644 index 0000000..54d8c6a --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.metrics + +import scala.collection.JavaConverters._ + +import akka.actor._ +import org.slf4j.Logger + +import io.gearpump.codahale.metrics._ +import org.apache.gearpump.metrics +import org.apache.gearpump.util.LogUtil + +/** Metric objects registry */ +class Metrics(sampleRate: Int) extends Extension { + + val registry = new MetricRegistry() + + def meter(name: String): metrics.Meter = { + new metrics.Meter(name, registry.meter(name), sampleRate) + } + + def histogram(name: String): Histogram = { + new Histogram(name, registry.histogram(name), sampleRate) + } + + def histogram(name: String, sampleRate: Int): Histogram = { + new Histogram(name, registry.histogram(name), sampleRate) + } + + def counter(name: String): Counter = { + new Counter(name, registry.counter(name), sampleRate) + } + + def register(set: MetricSet): Unit = { + val names = registry.getNames + val metrics = set.getMetrics.asScala.filterKeys { key => !names.contains(key) } + metrics.foreach { kv => + registry.register(kv._1, kv._2) + } + } +} + +object Metrics extends ExtensionId[Metrics] with ExtensionIdProvider { + + val LOG: Logger = LogUtil.getLogger(getClass) + import org.apache.gearpump.util.Constants._ + + sealed trait MetricType { + def name: String + } + + object MetricType { + def unapply(obj: MetricType): Option[(Histogram, Counter, Meter, Timer, Gauge)] = { + obj match { + case x: Histogram => Some((x, null, null, null, null)) + case x: Counter => Some((null, x, null, null, null)) + case x: Meter => Some((null, null, x, null, null)) + case x: Timer => Some((null, null, null, x, null)) + case g: Gauge => Some((null, null, null, null, g)) + } + } + + def apply(h: Histogram, c: Counter, m: Meter, t: Timer, g: Gauge): MetricType = { + val result = + if (h != null) h + else if (c != null) c + else if (m != null) m + else if (t != null) t + else if (g != null) g + else null + result + } + } + + case class Histogram ( + name: String, mean: Double, + stddev: Double, median: Double, + p95: Double, p99: Double, p999: Double) + extends MetricType + + case class Counter(name: String, value: Long) extends MetricType + + case class Meter( + name: String, count: Long, meanRate: Double, + m1: Double, rateUnit: String) + extends MetricType + + case class Timer( + name: String, count: Long, min: Double, max: Double, + mean: Double, stddev: Double, median: Double, + p75: Double, p95: Double, p98: Double, + p99: Double, p999: Double, meanRate: Double, + m1: Double, m5: Double, m15: Double, + rateUnit: String, durationUnit: String) + extends MetricType + + case class Gauge(name: String, value: Long) extends MetricType + + case object ReportMetrics + + case class DemandMoreMetrics(subscriber: ActorRef) + + override def get(system: ActorSystem): Metrics = super.get(system) + + override def lookup: ExtensionId[Metrics] = Metrics + + override def createExtension(system: ExtendedActorSystem): Metrics = { + val metricsEnabled = system.settings.config.getBoolean(GEARPUMP_METRIC_ENABLED) + LOG.info(s"Metrics is enabled..., $metricsEnabled") + val sampleRate = system.settings.config.getInt(GEARPUMP_METRIC_SAMPLE_RATE) + if (metricsEnabled) { + val meters = new Metrics(sampleRate) + meters + } else { + new DummyMetrics + } + } + + class DummyMetrics extends Metrics(1) { + override def register(set: MetricSet): Unit = Unit + + private val meter = new metrics.Meter("", null) { + override def mark(): Unit = Unit + override def mark(n: Long): Unit = Unit + override def getOneMinuteRate(): Double = 0 + } + + private val histogram = new metrics.Histogram("", null) { + override def update(value: Long): Unit = Unit + override def getMean(): Double = 0 + override def getStdDev(): Double = 0 + } + + private val counter = new metrics.Counter("", null) { + override def inc(): Unit = Unit + override def inc(n: Long): Unit = Unit + } + + override def meter(name: String): metrics.Meter = meter + override def histogram(name: String): metrics.Histogram = histogram + override def counter(name: String): metrics.Counter = counter + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/MetricsAggregator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/metrics/MetricsAggregator.scala b/core/src/main/scala/org/apache/gearpump/metrics/MetricsAggregator.scala new file mode 100644 index 0000000..08dad57 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/metrics/MetricsAggregator.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.metrics + +import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem + +/** + * Aggregates a larger set of metrics into a smaller set + * + * Sub Class must implement a constructor with signature like this: + * MetricsAggregator(config: Config) + */ +trait MetricsAggregator { + def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]) + : List[HistoryMetricsItem] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala b/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala new file mode 100644 index 0000000..9b506af --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.metrics + +import java.net.InetSocketAddress +import java.util.concurrent.TimeUnit +import scala.concurrent.duration._ + +import akka.actor.{Actor, ActorRef} + +import io.gearpump.codahale.metrics.graphite.{Graphite, GraphiteReporter} +import io.gearpump.codahale.metrics.{MetricFilter, Slf4jReporter} +import org.apache.gearpump.metrics.Metrics.{DemandMoreMetrics, ReportMetrics} +import org.apache.gearpump.metrics.MetricsReporterService.ReportTo +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.LogUtil + +/** + * Reports the metrics data to some where, like Ganglia, remote Akka actor, log files... + * + * @param metrics Holds a list of metrics object. + */ +class MetricsReporterService(metrics: Metrics) extends Actor { + + private val LOG = LogUtil.getLogger(getClass) + private implicit val system = context.system + + private val reportInterval = system.settings.config.getInt(GEARPUMP_METRIC_REPORT_INTERVAL) + private val reporter = getReporter + implicit val dispatcher = context.dispatcher + + def receive: Receive = { + // The subscriber is demanding more messages. + case DemandMoreMetrics(subscriber) => { + reporter.report(subscriber) + context.system.scheduler.scheduleOnce(reportInterval.milliseconds, + subscriber, ReportMetrics) + } + } + + def startGraphiteReporter(): ReportTo = { + val graphiteHost = system.settings.config.getString(GEARPUMP_METRIC_GRAPHITE_HOST) + val graphitePort = system.settings.config.getInt(GEARPUMP_METRIC_GRAPHITE_PORT) + + val graphite = new Graphite(new InetSocketAddress(graphiteHost, graphitePort)) + LOG.info(s"reporting to $graphiteHost, $graphitePort") + new ReportTo { + private val reporter = GraphiteReporter.forRegistry(metrics.registry) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .filter(MetricFilter.ALL) + .build(graphite) + + override def report(to: ActorRef): Unit = reporter.report() + } + } + + def startSlf4jReporter(): ReportTo = { + new ReportTo { + val reporter = Slf4jReporter.forRegistry(metrics.registry) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .filter(MetricFilter.ALL) + .outputTo(LOG) + .build() + + override def report(to: ActorRef): Unit = reporter.report() + } + } + + def startAkkaReporter(): ReportTo = { + new AkkaReporter(system, metrics.registry) + } + + def getReporter: ReportTo = { + val reporterType = system.settings.config.getString(GEARPUMP_METRIC_REPORTER) + LOG.info(s"Metrics reporter is enabled, using $reporterType reporter") + val reporter = reporterType match { + case "graphite" => startGraphiteReporter() + case "logfile" => startSlf4jReporter() + case "akka" => startAkkaReporter() + } + reporter + } +} + +object MetricsReporterService { + + /** Target where user want to report the metrics data to */ + trait ReportTo { + def report(to: ActorRef): Unit + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/package.scala b/core/src/main/scala/org/apache/gearpump/package.scala new file mode 100644 index 0000000..b1118d3 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/package.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache + +package object gearpump { + type TimeStamp = Long + val LatestTime = -1 +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala new file mode 100644 index 0000000..99cbcb6 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.partitioner + +import org.apache.gearpump.Message + +/** Used by storm module to broadcast message to all downstream tasks */ +class BroadcastPartitioner extends MulticastPartitioner { + private var lastPartitionNum = -1 + private var partitions = Array.empty[Int] + + override def getPartitions( + msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = { + if (partitionNum != lastPartitionNum) { + partitions = (0 until partitionNum).toArray + lastPartitionNum = partitionNum + } + partitions + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala new file mode 100644 index 0000000..5a3eec4 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.partitioner + +import org.apache.gearpump.Message + +/** + * Will have the same parallelism with last processor + * And each task in current processor will co-locate with task of last processor + */ +class CoLocationPartitioner extends UnicastPartitioner { + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { + currentPartitionId + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala new file mode 100644 index 0000000..ee684a9 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.partitioner + +import org.apache.gearpump.Message + +/** + * Only make sense when the message has implemented the hashCode() + * Otherwise, it will use Object.hashCode(), which will not return + * same hash code after serialization and deserialization. + */ +class HashPartitioner extends UnicastPartitioner { + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { + (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala new file mode 100644 index 0000000..d68fa65 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.partitioner + +import scala.reflect.ClassTag + +import org.apache.commons.lang.SerializationUtils + +import org.apache.gearpump.Message + +/** + * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner decide how ONE task + * of upstream processor A send to several tasks of downstream processor B. + */ +sealed trait Partitioner extends Serializable + +/** + * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), UnicastPartitioner does + * ONE-task {@literal ->} ONE-task mapping. + */ +trait UnicastPartitioner extends Partitioner { + + /** + * Gets the SINGLE downstream processor task index to send message to. + * + * @param msg Message you want to send + * @param partitionNum How many tasks does the downstream processor have. + * @param upstreamTaskIndex Upstream task's task index who trigger the getPartition() call. + * + * @return ONE task index of downstream processor. + */ + def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Int + + def getPartition(msg: Message, partitionNum: Int): Int = { + getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID) + } +} + +trait MulticastPartitioner extends Partitioner { + + /** + * Gets a list of downstream processor task indexes to send message to. + * + * @param upstreamTaskIndex Current sender task's task index. + * + */ + def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Array[Int] + + def getPartitions(msg: Message, partitionNum: Int): Array[Int] = { + getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID) + } +} + +sealed trait PartitionerFactory { + + def name: String + + def partitioner: Partitioner +} + +/** Stores the Partitioner in an object. To use it, user need to deserialize the object */ +class PartitionerObject(private[this] val _partitioner: Partitioner) + extends PartitionerFactory with Serializable { + + override def name: String = partitioner.getClass.getName + + override def partitioner: Partitioner = { + SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner] + } +} + +/** Store the partitioner in class Name, the user need to instantiate a new class */ +class PartitionerByClassName(partitionerClass: String) + extends PartitionerFactory with Serializable { + + override def name: String = partitionerClass + override def partitioner: Partitioner = { + Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner] + } +} + +/** + * @param partitionerFactory How we construct a Partitioner. + */ +case class PartitionerDescription(partitionerFactory: PartitionerFactory) + +object Partitioner { + val UNKNOWN_PARTITION_ID = -1 + + def apply[T <: Partitioner](implicit clazz: ClassTag[T]): PartitionerDescription = { + PartitionerDescription(new PartitionerByClassName(clazz.runtimeClass.getName)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala new file mode 100644 index 0000000..55ef614 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.partitioner + +import scala.util.Random + +import org.apache.gearpump.Message + +/** + * The idea of ShuffleGroupingPartitioner is derived from Storm. + * Messages are randomly distributed across the downstream's tasks in a way such that + * each task is guaranteed to get an equal number of messages. + */ +class ShuffleGroupingPartitioner extends UnicastPartitioner { + private val random = new Random + private var index = -1 + private var partitions = List.empty[Int] + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { + index += 1 + if (partitions.isEmpty) { + partitions = 0.until(partitionNum).toList + partitions = random.shuffle(partitions) + } else if (index >= partitionNum) { + index = 0 + partitions = random.shuffle(partitions) + } + partitions(index) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala new file mode 100644 index 0000000..5c66d66 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.partitioner + +import java.util.Random + +import org.apache.gearpump.Message + +/** + * Round Robin partition the data to downstream processor tasks. + */ +class ShufflePartitioner extends UnicastPartitioner { + private var seed = 0 + private var count = 0 + + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { + + if (seed == 0) { + seed = newSeed() + } + + val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum + count = count + 1 + result + } + + private def newSeed(): Int = new Random().nextInt() +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/security/Authenticator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/security/Authenticator.scala b/core/src/main/scala/org/apache/gearpump/security/Authenticator.scala new file mode 100644 index 0000000..497506b --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/security/Authenticator.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.security +import scala.concurrent.{ExecutionContext, Future} + +import org.apache.gearpump.security.Authenticator.AuthenticationResult + +/** + * Authenticator for UI dashboard. + * + * Sub Class must implement a constructor with signature like this: + * this(config: Config) + */ +trait Authenticator { + + // TODO: Change the signature to return more attributes of user credentials... + def authenticate( + user: String, password: String, ec: ExecutionContext): Future[AuthenticationResult] +} + +object Authenticator { + + trait AuthenticationResult { + + def authenticated: Boolean + + def permissionLevel: Int + } + + val UnAuthenticated = new AuthenticationResult { + override val authenticated = false + override val permissionLevel = -1 + } + + /** Guest can view but have no permission to submit app or write */ + val Guest = new AuthenticationResult { + override val authenticated = true + override val permissionLevel = 1000 + } + + /** User can submit app, kill app, but have no permission to add or remote machines */ + val User = new AuthenticationResult { + override val authenticated = true + override val permissionLevel = 1000 + Guest.permissionLevel + } + + /** Super user */ + val Admin = new AuthenticationResult { + override val authenticated = true + override val permissionLevel = 1000 + User.permissionLevel + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticator.scala b/core/src/main/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticator.scala new file mode 100644 index 0000000..110fd8c --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticator.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.security + +import scala.concurrent.{ExecutionContext, Future} + +import com.typesafe.config.Config + +import org.apache.gearpump.security.Authenticator.AuthenticationResult +import org.apache.gearpump.security.ConfigFileBasedAuthenticator._ + +object ConfigFileBasedAuthenticator { + + private val ROOT = "gearpump.ui-security.config-file-based-authenticator" + private val ADMINS = ROOT + "." + "admins" + private val USERS = ROOT + "." + "users" + private val GUESTS = ROOT + "." + "guests" + + private case class Credentials( + admins: Map[String, String], users: Map[String, String], guests: Map[String, String]) { + + def verify(user: String, password: String): AuthenticationResult = { + if (admins.contains(user)) { + if (verify(user, password, admins)) { + Authenticator.Admin + } else { + Authenticator.UnAuthenticated + } + } else if (users.contains(user)) { + if (verify(user, password, users)) { + Authenticator.User + } else { + Authenticator.UnAuthenticated + } + } else if (guests.contains(user)) { + if (verify(user, password, guests)) { + Authenticator.Guest + } else { + Authenticator.UnAuthenticated + } + } else { + Authenticator.UnAuthenticated + } + } + + private def verify(user: String, password: String, map: Map[String, String]): Boolean = { + val storedPass = map(user) + PasswordUtil.verify(password, storedPass) + } + } +} + +/** + * UI dashboard authenticator based on configuration file. + * + * It has three categories of users: admins, users, and guests. + * admins have unlimited permission, like shutdown a cluster, add/remove machines. + * users have limited permission to submit an application and etc.. + * guests can not submit/kill applications, but can view the application status. + * + * see conf/gear.conf section gearpump.ui-security.config-file-based-authenticator to find + * information about how to configure this authenticator. + * + * [Security consideration] + * It will keep one-way sha1 digest of password instead of password itself. The original password is + * NOT kept in any way, so generally it is safe. + * + * + * digesting flow (from original password to digest): + * {{{ + * random salt byte array of length 8 -> byte array of (salt + sha1(salt, password)) -> + * base64Encode. + * }}} + * + * Verification user input password with stored digest: + * {{{ + * base64Decode -> extract salt -> do sha1(salt, password) -> generate digest: + * salt + sha1 -> compare the generated digest with the stored digest. + * }}} + */ +class ConfigFileBasedAuthenticator(config: Config) extends Authenticator { + + private val credentials = loadCredentials(config) + + override def authenticate(user: String, password: String, ec: ExecutionContext) + : Future[AuthenticationResult] = { + implicit val ctx = ec + Future { + credentials.verify(user, password) + } + } + + private def loadCredentials(config: Config): Credentials = { + val admins = configToMap(config, ADMINS) + val users = configToMap(config, USERS) + val guests = configToMap(config, GUESTS) + new Credentials(admins, users, guests) + } + + private def configToMap(config: Config, path: String) = { + import scala.collection.JavaConverters._ + config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/security/PasswordUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/security/PasswordUtil.scala b/core/src/main/scala/org/apache/gearpump/security/PasswordUtil.scala new file mode 100644 index 0000000..25b68c6 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/security/PasswordUtil.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.security + +import java.security.MessageDigest +import scala.util.Try + +import sun.misc.{BASE64Decoder, BASE64Encoder} + +/** + * Util to verify whether user input password is valid or not. + * It use sha1 to do the digesting. + */ +object PasswordUtil { + private val SALT_LENGTH = 8 + + /** + * Verifies user input password with stored digest: + * {{{ + * base64Decode -> extract salt -> do sha1(salt, password) -> + * generate digest: salt + sha1 -> compare the generated digest with the stored digest. + * }}} + */ + def verify(password: String, stored: String): Boolean = { + Try { + val decoded = base64Decode(stored) + val salt = new Array[Byte](SALT_LENGTH) + Array.copy(decoded, 0, salt, 0, SALT_LENGTH) + + hash(password, salt) == stored + }.getOrElse(false) + } + /** + * digesting flow (from original password to digest): + * {{{ + * random salt byte array of length 8 -> + * byte array of (salt + sha1(salt, password)) -> base64Encode + * }}} + */ + def hash(password: String): String = { + // Salt generation 64 bits long + val salt = new Array[Byte](SALT_LENGTH) + new java.util.Random().nextBytes(salt) + hash(password, salt) + } + + private def hash(password: String, salt: Array[Byte]): String = { + val digest = MessageDigest.getInstance("SHA-1") + digest.reset() + digest.update(salt) + var input = digest.digest(password.getBytes("UTF-8")) + digest.reset() + input = digest.digest(input) + val withSalt = salt ++ input + base64Encode(withSalt) + } + + private def base64Encode(data: Array[Byte]): String = { + val endecoder = new BASE64Encoder() + endecoder.encode(data) + } + + private def base64Decode(data: String): Array[Byte] = { + val decoder = new BASE64Decoder() + decoder.decodeBuffer(data) + } + + // scalastyle:off println + private def help() = { + Console.println("usage: gear org.apache.gearpump.security.PasswordUtil -password " + + "<your password>") + } + + def main(args: Array[String]): Unit = { + if (args.length != 2 || args(0) != "-password") { + help() + } else { + val pass = args(1) + val result = hash(pass) + Console.println("Here is the hashed password") + Console.println("==============================") + Console.println(result) + } + } + // scalastyle:on println +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializationFramework.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializationFramework.scala b/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializationFramework.scala new file mode 100644 index 0000000..6295faf --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializationFramework.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.serializer + +import akka.actor.ExtendedActorSystem + +import org.apache.gearpump.cluster.UserConfig + +/** + * A build-in serializer framework using kryo + * + * NOTE: The Kryo here is a shaded version by Gearpump + */ +class FastKryoSerializationFramework extends SerializationFramework { + private var system: ExtendedActorSystem = null + + private lazy val pool = new ThreadLocal[Serializer]() { + override def initialValue(): Serializer = { + new FastKryoSerializer(system) + } + } + + override def init(system: ExtendedActorSystem, config: UserConfig): Unit = { + this.system = system + } + + override def get(): Serializer = { + pool.get() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala b/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala new file mode 100644 index 0000000..ed1e347 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.serializer + +import akka.actor.ExtendedActorSystem + +import io.gearpump.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy +import io.gearpump.objenesis.strategy.StdInstantiatorStrategy +import io.gearpump.romix.serialization.kryo.KryoSerializerWrapper +import org.apache.gearpump.serializer.FastKryoSerializer.KryoSerializationException +import org.apache.gearpump.util.LogUtil + +class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer { + + private val LOG = LogUtil.getLogger(getClass) + private val config = system.settings.config + + private val kryoSerializer = new KryoSerializerWrapper(system) + private val kryo = kryoSerializer.kryo + val strategy = new DefaultInstantiatorStrategy + strategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy) + kryo.setInstantiatorStrategy(strategy) + private val kryoClazz = new GearpumpSerialization(config).customize(kryo) + + override def serialize(message: Any): Array[Byte] = { + try { + kryoSerializer.toBinary(message) + } catch { + case ex: java.lang.IllegalArgumentException => + val clazz = message.getClass + val error = s""" + | ${ex.getMessage} + |You can also register the class by providing a configuration with serializer + |defined, + | + |gearpump{ + | serializers { + | ## Follow this format when adding new serializer for new message types + | # "yourpackage.YourClass" = "yourpackage.YourSerializerForThisClass" + | + | ## If you intend to use default serializer for this class, then you can write this + | # "yourpackage.YourClass" = "" + | } + |} + | + |If you want to register the serializer globally, you need to change + |gear.conf on every worker in the cluster; if you only want to register + |the serializer for a single streaming application, you need to create + |a file under conf/ named application.conf, and add the above configuration + |into application.conf. To verify whether the configuration is effective, + |you can browser your UI http://{UI Server Host}:8090/api/v1.0/app/{appId}/config, + |and check whether your custom serializer is added. + """.stripMargin + + LOG.error(error, ex) + throw new KryoSerializationException(error, ex) + } + } + + override def deserialize(msg: Array[Byte]): Any = { + kryoSerializer.fromBinary(msg) + } +} + +object FastKryoSerializer { + class KryoSerializationException(msg: String, ex: Throwable = null) extends Exception(msg, ex) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala b/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala new file mode 100644 index 0000000..f9c6299 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.serializer + +import com.typesafe.config.Config +import org.slf4j.Logger + +import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer} +import org.apache.gearpump.util.{Constants, LogUtil} + +class GearpumpSerialization(config: Config) { + + private val LOG: Logger = LogUtil.getLogger(getClass) + + def customize(kryo: Kryo): Unit = { + + val serializationMap = configToMap(config, Constants.GEARPUMP_SERIALIZERS) + + serializationMap.foreach { kv => + val (key, value) = kv + val keyClass = Class.forName(key) + + if (value == null || value.isEmpty) { + + // Use default serializer for this class type + kryo.register(keyClass) + } else { + val valueClass = Class.forName(value) + val register = kryo.register(keyClass, + valueClass.newInstance().asInstanceOf[KryoSerializer[_]]) + LOG.debug(s"Registering ${keyClass}, id: ${register.getId}") + } + } + kryo.setReferences(false) + + // Requires the user to register the class first before using + kryo.setRegistrationRequired(true) + } + + private final def configToMap(config: Config, path: String) = { + import scala.collection.JavaConverters._ + config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/serializer/SerializationFramework.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/serializer/SerializationFramework.scala b/core/src/main/scala/org/apache/gearpump/serializer/SerializationFramework.scala new file mode 100644 index 0000000..995ba1f --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/serializer/SerializationFramework.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.serializer + +import akka.actor.ExtendedActorSystem + +import org.apache.gearpump.cluster.UserConfig + +/** + * User are allowed to use a customized serialization framework by extending this + * interface. + */ +trait SerializationFramework { + def init(system: ExtendedActorSystem, config: UserConfig) + + /** + * Need to be thread safe + * + * Get a serializer to use. + * Note: this method can be called in a multi-thread environment. It's the + * responsibility of SerializationFramework Developer to assure this method + * is thread safe. + * + * To be thread-safe, one recommendation would be using a thread local pool + * to maintain reference to Serializer of same thread. + */ + def get(): Serializer +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/serializer/Serializer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/serializer/Serializer.scala b/core/src/main/scala/org/apache/gearpump/serializer/Serializer.scala new file mode 100644 index 0000000..7c0f4bf --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/serializer/Serializer.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.serializer + +/** + * User defined message serializer + */ +trait Serializer { + def serialize(message: Any): Array[Byte] + + def deserialize(msg: Array[Byte]): Any +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/Express.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/transport/Express.scala b/core/src/main/scala/org/apache/gearpump/transport/Express.scala new file mode 100644 index 0000000..ef1e8d9 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/transport/Express.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.transport + +import scala.collection.immutable.LongMap +import scala.concurrent._ + +import akka.actor._ +import akka.agent.Agent +import org.slf4j.Logger + +import org.apache.gearpump.transport.netty.Client.Close +import org.apache.gearpump.transport.netty.{TaskMessage, Context} +import org.apache.gearpump.util.LogUtil + +trait ActorLookupById { + + /** Lookup actor ref for local task actor by providing a TaskId (TaskId.toLong) */ + def lookupLocalActor(id: Long): Option[ActorRef] +} + +/** + * Custom networking layer. + * + * It will translate long sender/receiver address to shorter ones to reduce + * the network overhead. + */ +class Express(val system: ExtendedActorSystem) extends Extension with ActorLookupById { + + import system.dispatcher + + import org.apache.gearpump.transport.Express._ + val localActorMap = Agent(LongMap.empty[ActorRef]) + val remoteAddressMap = Agent(Map.empty[Long, HostPort]) + + val remoteClientMap = Agent(Map.empty[HostPort, ActorRef]) + + val conf = system.settings.config + + lazy val (context, serverPort, localHost) = init + + lazy val init = { + LOG.info(s"Start Express init ...${system.name}") + val context = new Context(system, conf) + val serverPort = context.bind("netty-server", this) + val localHost = HostPort(system.provider.getDefaultAddress.host.get, serverPort) + LOG.info(s"binding to netty server $localHost") + + system.registerOnTermination(new Runnable { + override def run(): Unit = context.close() + }) + (context, serverPort, localHost) + } + + def unregisterLocalActor(id: Long): Unit = { + localActorMap.sendOff(_ - id) + } + + /** Start Netty client actors to connect to remote machines */ + def startClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = { + val clientsToClose = remoteClientMap.get().filterKeys(!hostPorts.contains(_)).keySet + closeClients(clientsToClose) + hostPorts.toList.foldLeft(Future(Map.empty[HostPort, ActorRef])) { (future, hostPort) => + remoteClientMap.alter { map => + if (!map.contains(hostPort)) { + val actor = context.connect(hostPort) + map + (hostPort -> actor) + } else { + map + } + } + } + } + + def closeClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = { + remoteClientMap.alter { map => + map.filterKeys(hostPorts.contains).foreach { hostAndClient => + val (_, client) = hostAndClient + client ! Close + } + map -- hostPorts + } + } + + def registerLocalActor(id: Long, actor: ActorRef): Unit = { + LOG.info(s"RegisterLocalActor: $id, actor: ${actor.path.name}") + init + localActorMap.sendOff(_ + (id -> actor)) + } + + def lookupLocalActor(id: Long): Option[ActorRef] = localActorMap.get().get(id) + + def lookupRemoteAddress(id: Long): Option[HostPort] = remoteAddressMap.get().get(id) + + /** Send message to remote task */ + def transport(taskMessage: TaskMessage, remote: HostPort): Unit = { + + val remoteClient = remoteClientMap.get.get(remote) + if (remoteClient.isDefined) { + remoteClient.get.tell(taskMessage, Actor.noSender) + } else { + val errorMsg = s"Clients has not been launched properly before transporting messages, " + + s"the destination is $remote" + LOG.error(errorMsg) + throw new Exception(errorMsg) + } + } +} + +/** A customized transport layer by using Akka extension */ +object Express extends ExtensionId[Express] with ExtensionIdProvider { + val LOG: Logger = LogUtil.getLogger(getClass) + + override def get(system: ActorSystem): Express = super.get(system) + + override def lookup: ExtensionId[Express] = Express + + override def createExtension(system: ExtendedActorSystem): Express = new Express(system) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/HostPort.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/transport/HostPort.scala b/core/src/main/scala/org/apache/gearpump/transport/HostPort.scala new file mode 100644 index 0000000..d6dcd08 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/transport/HostPort.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.transport + +case class HostPort(host: String, port: Int) { + def toTuple: (String, Int) = { + (host, port) + } +} + +object HostPort { + def apply(address: String): HostPort = { + val hostAndPort = address.split(":") + new HostPort(hostAndPort(0), hostAndPort(1).toInt) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/netty/Client.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/transport/netty/Client.scala b/core/src/main/scala/org/apache/gearpump/transport/netty/Client.scala new file mode 100644 index 0000000..506e11c --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/transport/netty/Client.scala @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.transport.netty + +import java.net.{ConnectException, InetSocketAddress} +import java.nio.channels.ClosedChannelException +import java.util +import java.util.Random +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration.FiniteDuration +import scala.language.implicitConversions + +import akka.actor.Actor +import org.jboss.netty.bootstrap.ClientBootstrap +import org.jboss.netty.channel._ +import org.slf4j.Logger + +import org.apache.gearpump.transport.HostPort +import org.apache.gearpump.util.LogUtil + +/** + * Netty Client implemented as an actor, on the other side, there is a netty server Actor. + * All messages sent to this actor will be forwarded to remote machine. + */ +class Client(conf: NettyConfig, factory: ChannelFactory, hostPort: HostPort) extends Actor { + import org.apache.gearpump.transport.netty.Client._ + + val name = s"netty-client-$hostPort" + + private final var bootstrap: ClientBootstrap = null + private final val random: Random = new Random + private val serializer = conf.newTransportSerializer + private var channel: Channel = null + + var batch = new util.ArrayList[TaskMessage] + + private val init = { + bootstrap = NettyUtil.createClientBootStrap(factory, + new ClientPipelineFactory(name, conf), conf.buffer_size) + self ! Connect(0) + } + + def receive: Receive = messageHandler orElse connectionHandler + + def messageHandler: Receive = { + case msg: TaskMessage => + batch.add(msg) + case flush@Flush(flushChannel) => + if (channel != flushChannel) { + Unit // Drop, as it belong to old channel flush message + } else if (batch.size > 0 && flushChannel.isWritable) { + send(flushChannel, batch.iterator) + batch.clear() + self ! flush + } else { + import context.dispatcher + context.system.scheduler.scheduleOnce( + new FiniteDuration(conf.flushCheckInterval, TimeUnit.MILLISECONDS), self, flush) + } + } + + def connectionHandler: Receive = { + case ChannelReady(channel) => + this.channel = channel + self ! Flush(channel) + case Connect(tries) => + if (null == channel) { + connect(tries) + } else { + LOG.error("there already exist a channel, will not establish a new one...") + } + case CompareAndReconnectIfEqual(oldChannel) => + if (oldChannel == channel) { + channel = null + self ! Connect(0) + } + case Close => + close() + context.become(closed) + } + + def closed: Receive = { + case msg: AnyRef => + LOG.error(s"This client $name is closed, drop any message ${msg.getClass.getSimpleName}...") + } + + private def connect(tries: Int): Unit = { + LOG.info(s"netty client try to connect to $name, tries: $tries") + if (tries <= conf.max_retries) { + val remote_addr = new InetSocketAddress(hostPort.host, hostPort.port) + val future = bootstrap.connect(remote_addr) + future success { current => + LOG.info(s"netty client successfully connectted to $name, tries: $tries") + self ! ChannelReady(current) + } fail { (current, ex) => + LOG.error(s"failed to connect to $name, reason: ${ex.getMessage}, class: ${ex.getClass}") + current.close() + import context.dispatcher + context.system.scheduler.scheduleOnce( + new FiniteDuration( + getSleepTimeMs(tries), TimeUnit.MILLISECONDS), self, Connect(tries + 1)) + } + } else { + LOG.error(s"fail to connect to a remote host $name after retied $tries ...") + self ! Close + } + } + + private def send(flushChannel: Channel, msgs: util.Iterator[TaskMessage]) { + var messageBatch: MessageBatch = null + + while (msgs.hasNext) { + val message: TaskMessage = msgs.next() + if (null == messageBatch) { + messageBatch = new MessageBatch(conf.messageBatchSize, serializer) + } + messageBatch.add(message) + if (messageBatch.isFull) { + val toBeFlushed: MessageBatch = messageBatch + flushRequest(flushChannel, toBeFlushed) + messageBatch = null + } + } + if (null != messageBatch && !messageBatch.isEmpty) { + flushRequest(flushChannel, messageBatch) + } + } + + private def close() { + LOG.info(s"closing netty client $name...") + if (null != channel) { + channel.close() + channel = null + } + batch = null + } + + override def postStop(): Unit = { + close() + } + + private def flushRequest(channel: Channel, requests: MessageBatch) { + val future: ChannelFuture = channel.write(requests) + future.fail { (channel, ex) => + if (channel.isOpen) { + channel.close + } + LOG.error(s"failed to send requests " + + s"to ${channel.getRemoteAddress} ${ex.getClass.getSimpleName}") + if (!ex.isInstanceOf[ClosedChannelException]) { + LOG.error(ex.getMessage, ex) + } + self ! CompareAndReconnectIfEqual(channel) + } + } + + private def getSleepTimeMs(retries: Int): Long = { + if (retries > 30) { + conf.max_sleep_ms + } else { + val backoff = 1 << retries + val sleepMs = conf.base_sleep_ms * Math.max(1, random.nextInt(backoff)) + if (sleepMs < conf.max_sleep_ms) sleepMs else conf.max_sleep_ms + } + } + + private def isChannelWritable = (null != channel) && channel.isWritable +} + +object Client { + val LOG: Logger = LogUtil.getLogger(getClass) + + // Reconnect if current channel equals channel + case class CompareAndReconnectIfEqual(channel: Channel) + + case class Connect(tries: Int) + case class ChannelReady(chanel: Channel) + case object Close + + case class Flush(channel: Channel) + + class ClientErrorHandler(name: String) extends SimpleChannelUpstreamHandler { + + override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) { + event.getCause match { + case ex: ConnectException => Unit + case ex: ClosedChannelException => + LOG.warn("exception found when trying to close netty connection", ex.getMessage) + case ex => LOG.error("Connection failed " + name, ex) + } + } + } + + class ClientPipelineFactory(name: String, conf: NettyConfig) extends ChannelPipelineFactory { + def getPipeline: ChannelPipeline = { + val pipeline: ChannelPipeline = Channels.pipeline + pipeline.addLast("decoder", new MessageDecoder(conf.newTransportSerializer)) + pipeline.addLast("encoder", new MessageEncoder) + pipeline.addLast("handler", new ClientErrorHandler(name)) + pipeline + } + } + + implicit def channelFutureToChannelFutureOps(channel: ChannelFuture): ChannelFutureOps = { + new ChannelFutureOps(channel) + } + + class ChannelFutureOps(channelFuture: ChannelFuture) { + + def success(handler: (Channel => Unit)): ChannelFuture = { + channelFuture.addListener(new ChannelFutureListener { + def operationComplete(future: ChannelFuture) { + if (future.isSuccess) { + handler(future.getChannel) + } + } + }) + channelFuture + } + + def fail(handler: ((Channel, Throwable) => Unit)): ChannelFuture = { + channelFuture.addListener(new ChannelFutureListener { + def operationComplete(future: ChannelFuture) { + if (!future.isSuccess) { + handler(future.getChannel, future.getCause) + } + } + }) + channelFuture + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/netty/Context.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/transport/netty/Context.scala b/core/src/main/scala/org/apache/gearpump/transport/netty/Context.scala new file mode 100644 index 0000000..bc19960 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/transport/netty/Context.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.transport.netty + +import java.io.Closeable +import java.util.concurrent._ + +import scala.collection.JavaConverters._ + +import akka.actor.{ActorRef, ActorSystem, Props} +import com.typesafe.config.Config +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory +import org.slf4j.Logger + +import org.apache.gearpump.transport.netty.Server.ServerPipelineFactory +import org.apache.gearpump.transport.{ActorLookupById, HostPort} +import org.apache.gearpump.util.{Constants, LogUtil} + +object Context { + private final val LOG: Logger = LogUtil.getLogger(getClass) +} + +/** Netty Context */ +class Context(system: ActorSystem, conf: NettyConfig) extends IContext { + import org.apache.gearpump.transport.netty.Context._ + + def this(system: ActorSystem, conf: Config) { + this(system, new NettyConfig(conf)) + } + + private val closeHandler = new ConcurrentLinkedQueue[Closeable]() + private val nettyDispatcher = system.settings.config.getString(Constants.NETTY_DISPATCHER) + val maxWorkers: Int = 1 + + private lazy val clientChannelFactory: NioClientSocketChannelFactory = { + val bossFactory: ThreadFactory = new NettyRenameThreadFactory("client" + "-boss") + val workerFactory: ThreadFactory = new NettyRenameThreadFactory("client" + "-worker") + val channelFactory = + new NioClientSocketChannelFactory( + Executors.newCachedThreadPool(bossFactory), + Executors.newCachedThreadPool(workerFactory), maxWorkers) + + closeHandler.add(new Closeable { + override def close(): Unit = { + LOG.info("Closing all client resources....") + channelFactory.releaseExternalResources + } + }) + channelFactory + } + + def bind( + name: String, lookupActor : ActorLookupById, deserializeFlag : Boolean = true, + inputPort: Int = 0): Int = { + // TODO: whether we should expose it as application config? + val server = system.actorOf(Props(classOf[Server], name, conf, lookupActor, + deserializeFlag).withDispatcher(nettyDispatcher), name) + val (port, channel) = NettyUtil.newNettyServer(name, + new ServerPipelineFactory(server, conf), 5242880, inputPort) + val factory = channel.getFactory + closeHandler.add(new Closeable { + override def close(): Unit = { + system.stop(server) + channel.close() + LOG.info("Closing all server resources....") + factory.releaseExternalResources + } + }) + port + } + + def connect(hostPort: HostPort): ActorRef = { + val client = system.actorOf(Props(classOf[Client], conf, clientChannelFactory, hostPort) + .withDispatcher(nettyDispatcher)) + closeHandler.add(new Closeable { + override def close(): Unit = { + LOG.info("closing Client actor....") + system.stop(client) + } + }) + + client + } + + /** + * terminate this context + */ + def close(): Unit = { + + LOG.info(s"Context.term, cleanup resources...., " + + s"we have ${closeHandler.size()} items to close...") + + // Cleans up resource in reverse order so that client actor can be cleaned + // before clientChannelFactory + closeHandler.iterator().asScala.toList.reverse.foreach(_.close()) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/netty/IContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/transport/netty/IContext.scala b/core/src/main/scala/org/apache/gearpump/transport/netty/IContext.scala new file mode 100644 index 0000000..dae7b3a --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/transport/netty/IContext.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.transport.netty + +import akka.actor.ActorRef + +import org.apache.gearpump.transport.{ActorLookupById, HostPort} + +trait IContext { + + /** + * Create a Netty server connection. + */ + def bind(name: String, lookupActor: ActorLookupById, deserializeFlag: Boolean, port: Int): Int + + /** + * Create a Netty client actor + */ + def connect(hostPort: HostPort): ActorRef + + /** + * Close resource for this context + */ + def close() +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/netty/NettyConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/transport/netty/NettyConfig.scala b/core/src/main/scala/org/apache/gearpump/transport/netty/NettyConfig.scala new file mode 100644 index 0000000..03f759e --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/transport/netty/NettyConfig.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.transport.netty + +import com.typesafe.config.Config + +import org.apache.gearpump.util.Constants + +class NettyConfig(conf: Config) { + + val buffer_size = conf.getInt(Constants.NETTY_BUFFER_SIZE) + val max_retries = conf.getInt(Constants.NETTY_MAX_RETRIES) + val base_sleep_ms = conf.getInt(Constants.NETTY_BASE_SLEEP_MS) + val max_sleep_ms = conf.getInt(Constants.NETTY_MAX_SLEEP_MS) + val messageBatchSize = conf.getInt(Constants.NETTY_MESSAGE_BATCH_SIZE) + val flushCheckInterval = conf.getInt(Constants.NETTY_FLUSH_CHECK_INTERVAL) + + def newTransportSerializer: ITransportMessageSerializer = { + Class.forName( + conf.getString(Constants.GEARPUMP_TRANSPORT_SERIALIZER)) + .newInstance().asInstanceOf[ITransportMessageSerializer] + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/netty/NettyUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/transport/netty/NettyUtil.scala b/core/src/main/scala/org/apache/gearpump/transport/netty/NettyUtil.scala new file mode 100644 index 0000000..ddb0afe --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/transport/netty/NettyUtil.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.transport.netty + +import java.net.InetSocketAddress +import java.util.concurrent.{Executors, ThreadFactory} + +import org.jboss.netty.bootstrap.{ClientBootstrap, ServerBootstrap} +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory +import org.jboss.netty.channel.{Channel, ChannelFactory, ChannelPipelineFactory} + +object NettyUtil { + + def newNettyServer( + name: String, + pipelineFactory: ChannelPipelineFactory, + buffer_size: Int, + inputPort: Int = 0): (Int, Channel) = { + val bossFactory: ThreadFactory = new NettyRenameThreadFactory(name + "-boss") + val workerFactory: ThreadFactory = new NettyRenameThreadFactory(name + "-worker") + val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), + Executors.newCachedThreadPool(workerFactory), 1) + + val bootstrap = createServerBootStrap(factory, pipelineFactory, buffer_size) + val channel: Channel = bootstrap.bind(new InetSocketAddress(inputPort)) + val port = channel.getLocalAddress().asInstanceOf[InetSocketAddress].getPort() + (port, channel) + } + + def createServerBootStrap( + factory: ChannelFactory, pipelineFactory: ChannelPipelineFactory, buffer_size: Int) + : ServerBootstrap = { + val bootstrap = new ServerBootstrap(factory) + bootstrap.setOption("child.tcpNoDelay", true) + bootstrap.setOption("child.receiveBufferSize", buffer_size) + bootstrap.setOption("child.keepAlive", true) + bootstrap.setPipelineFactory(pipelineFactory) + bootstrap + } + + def createClientBootStrap( + factory: ChannelFactory, pipelineFactory: ChannelPipelineFactory, buffer_size: Int) + : ClientBootstrap = { + val bootstrap = new ClientBootstrap(factory) + bootstrap.setOption("tcpNoDelay", true) + bootstrap.setOption("sendBufferSize", buffer_size) + bootstrap.setOption("keepAlive", true) + bootstrap.setPipelineFactory(pipelineFactory) + bootstrap + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/netty/Server.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/transport/netty/Server.scala b/core/src/main/scala/org/apache/gearpump/transport/netty/Server.scala new file mode 100644 index 0000000..8d39795 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/transport/netty/Server.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.transport.netty + +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.immutable.IntMap +import scala.concurrent.Future + +import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem} +import org.jboss.netty.channel._ +import org.jboss.netty.channel.group.{ChannelGroup, DefaultChannelGroup} +import org.slf4j.Logger + +import org.apache.gearpump.transport.ActorLookupById +import org.apache.gearpump.util.{AkkaHelper, LogUtil} + +/** Netty server actor, message received will be forward to the target on the address line. */ +class Server( + name: String, conf: NettyConfig, lookupActor: ActorLookupById, deserializeFlag: Boolean) + extends Actor { + + private[netty] final val LOG: Logger = LogUtil.getLogger(getClass, context = name) + import org.apache.gearpump.transport.netty.Server._ + + val allChannels: ChannelGroup = new DefaultChannelGroup("gearpump-server") + + val system = context.system.asInstanceOf[ExtendedActorSystem] + + def receive: Receive = msgHandler orElse channelManager + // As we will only transfer TaskId on the wire, + // this object will translate taskId to or from ActorRef + private val taskIdActorRefTranslation = new TaskIdActorRefTranslation(context) + + def channelManager: Receive = { + case AddChannel(channel) => allChannels.add(channel) + case CloseChannel(channel) => + import context.dispatcher + Future { + channel.close.awaitUninterruptibly + allChannels.remove(channel) + } + } + + def msgHandler: Receive = { + case MsgBatch(msgs) => + msgs.asScala.groupBy(_.targetTask()).foreach { taskBatch => + val (taskId, taskMessages) = taskBatch + val actor = lookupActor.lookupLocalActor(taskId) + + if (actor.isEmpty) { + LOG.error(s"Cannot find actor for id: $taskId...") + } else taskMessages.foreach { taskMessage => + actor.get.tell(taskMessage.message(), + taskIdActorRefTranslation.translateToActorRef(taskMessage.sessionId())) + } + } + } + + override def postStop(): Unit = { + allChannels.close.awaitUninterruptibly + } +} + +object Server { + + class ServerPipelineFactory(server: ActorRef, conf: NettyConfig) extends ChannelPipelineFactory { + def getPipeline: ChannelPipeline = { + val pipeline: ChannelPipeline = Channels.pipeline + pipeline.addLast("decoder", new MessageDecoder(conf.newTransportSerializer)) + pipeline.addLast("encoder", new MessageEncoder) + pipeline.addLast("handler", new ServerHandler(server)) + pipeline + } + } + + class ServerHandler(server: ActorRef) extends SimpleChannelUpstreamHandler { + private[netty] final val LOG: Logger = LogUtil.getLogger(getClass, context = server.path.name) + + override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) { + server ! AddChannel(e.getChannel) + } + + override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) { + val msgs: util.List[TaskMessage] = e.getMessage.asInstanceOf[util.List[TaskMessage]] + if (msgs != null) { + server ! MsgBatch(msgs) + } + } + + override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) { + LOG.error("server errors in handling the request", e.getCause) + server ! CloseChannel(e.getChannel) + } + } + + class TaskIdActorRefTranslation(context: ActorContext) { + private var taskIdtoActorRef = IntMap.empty[ActorRef] + + /** 1-1 mapping from session id to fake ActorRef */ + def translateToActorRef(sessionId: Int): ActorRef = { + if (!taskIdtoActorRef.contains(sessionId)) { + + // A fake ActorRef for performance optimization. + val actorRef = AkkaHelper.actorFor(context.system, s"/session#$sessionId") + taskIdtoActorRef += sessionId -> actorRef + } + taskIdtoActorRef.get(sessionId).get + } + } + + case class AddChannel(channel: Channel) + + case class CloseChannel(channel: Channel) + + case class MsgBatch(messages: java.lang.Iterable[TaskMessage]) + +} \ No newline at end of file
