Repository: incubator-s2graph Updated Branches: refs/heads/master e71264d0d -> b6fe32fc2
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/test/benchmark/JsonBenchmarkSpec.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/benchmark/JsonBenchmarkSpec.scala b/s2rest_play/test/benchmark/JsonBenchmarkSpec.scala deleted file mode 100644 index a387ba5..0000000 --- a/s2rest_play/test/benchmark/JsonBenchmarkSpec.scala +++ /dev/null @@ -1,46 +0,0 @@ -package benchmark - -import play.api.libs.json.JsNumber -import play.api.test.{FakeApplication, PlaySpecification, WithApplication} -import play.libs.Json - -class JsonBenchmarkSpec extends BenchmarkCommon { - "to json" >> { - "json benchmark" >> { - - duration("map to json") { - (0 to 10) foreach { n => - val numberMaps = (0 to 100).map { n => (n.toString -> JsNumber(n * n)) }.toMap - Json.toJson(numberMaps) - } - } - - duration("directMakeJson") { - (0 to 10) foreach { n => - var jsObj = play.api.libs.json.Json.obj() - (0 to 10).foreach { n => - jsObj += (n.toString -> JsNumber(n * n)) - } - } - } - - duration("map to json 2") { - (0 to 50) foreach { n => - val numberMaps = (0 to 10).map { n => (n.toString -> JsNumber(n * n)) }.toMap - Json.toJson(numberMaps) - } - } - - duration("directMakeJson 2") { - (0 to 50) foreach { n => - var jsObj = play.api.libs.json.Json.obj() - (0 to 10).foreach { n => - jsObj += (n.toString -> JsNumber(n * n)) - } - } - } - true - } - true - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/test/benchmark/OrderingUtilBenchmarkSpec.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/benchmark/OrderingUtilBenchmarkSpec.scala b/s2rest_play/test/benchmark/OrderingUtilBenchmarkSpec.scala deleted file mode 100644 index d2d3624..0000000 --- a/s2rest_play/test/benchmark/OrderingUtilBenchmarkSpec.scala +++ /dev/null @@ -1,100 +0,0 @@ -package benchmark - -import com.kakao.s2graph.core.OrderingUtil._ -import com.kakao.s2graph.core.SeqMultiOrdering -import play.api.libs.json.{JsNumber, JsValue} -import play.api.test.PlaySpecification -import play.api.{Application => PlayApplication} - -import scala.util.Random - -class OrderingUtilBenchmarkSpec extends BenchmarkCommon { - "OrderingUtilBenchmarkSpec" should { - - "performance MultiOrdering any" >> { - val tupLs = (0 until 10) map { i => - Random.nextDouble() -> Random.nextLong() - } - - val seqLs = tupLs.map { tup => - Seq(tup._1, tup._2) - } - - val sorted1 = duration("TupleOrdering double,long") { - (0 until 1000) foreach { _ => - tupLs.sortBy { case (x, y) => - -x -> -y - } - } - tupLs.sortBy { case (x, y) => - -x -> -y - } - }.map { x => x._1 } - - val sorted2 = duration("MultiOrdering double,long") { - (0 until 1000) foreach { _ => - seqLs.sorted(new SeqMultiOrdering[Any](Seq(false, false))) - } - seqLs.sorted(new SeqMultiOrdering[Any](Seq(false, false))) - }.map { x => x.head } - - sorted1.toString() must_== sorted2.toString() - } - - "performance MultiOrdering double" >> { - val tupLs = (0 until 50) map { i => - Random.nextDouble() -> Random.nextDouble() - } - - val seqLs = tupLs.map { tup => - Seq(tup._1, tup._2) - } - - duration("MultiOrdering double") { - (0 until 1000) foreach { _ => - seqLs.sorted(new SeqMultiOrdering[Double](Seq(false, false))) - } - } - - duration("TupleOrdering double") { - (0 until 1000) foreach { _ => - tupLs.sortBy { case (x, y) => - -x -> -y - } - } - } - - 1 must_== 1 - } - - "performance MultiOrdering jsvalue" >> { - val tupLs = (0 until 50) map { i => - Random.nextDouble() -> Random.nextLong() - } - - val seqLs = tupLs.map { tup => - Seq(JsNumber(tup._1), JsNumber(tup._2)) - } - - val sorted1 = duration("TupleOrdering double,long") { - (0 until 1000) foreach { _ => - tupLs.sortBy { case (x, y) => - -x -> -y - } - } - tupLs.sortBy { case (x, y) => - -x -> -y - } - } - - val sorted2 = duration("MultiOrdering jsvalue") { - (0 until 1000) foreach { _ => - seqLs.sorted(new SeqMultiOrdering[JsValue](Seq(false, false))) - } - seqLs.sorted(new SeqMultiOrdering[JsValue](Seq(false, false))) - } - - 1 must_== 1 - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/test/benchmark/SamplingBenchmarkSpec.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/benchmark/SamplingBenchmarkSpec.scala b/s2rest_play/test/benchmark/SamplingBenchmarkSpec.scala deleted file mode 100644 index 0c27a2a..0000000 --- a/s2rest_play/test/benchmark/SamplingBenchmarkSpec.scala +++ /dev/null @@ -1,85 +0,0 @@ -package benchmark -import play.api.test.{FakeApplication, PlaySpecification, WithApplication} -import scala.annotation.tailrec -import scala.util.Random - -class SamplingBenchmarkSpec extends BenchmarkCommon with PlaySpecification { - "sample" should { - implicit val app = FakeApplication() - - "sample benchmark" in new WithApplication(app) { - @tailrec - def randomInt(n: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = { - if (set.size == n) set - else randomInt(n, range, set + Random.nextInt(range)) - } - - // sample using random array - def randomArraySample[T](num: Int, ls: List[T]): List[T] = { - val randomNum = randomInt(num, ls.size) - var sample = List.empty[T] - var idx = 0 - ls.foreach { e => - if (randomNum.contains(idx)) sample = e :: sample - idx += 1 - } - sample - } - - // sample using shuffle - def shuffleSample[T](num: Int, ls: List[T]): List[T] = { - Random.shuffle(ls).take(num) - } - - // sample using random number generation - def rngSample[T](num: Int, ls: List[T]): List[T] = { - var sampled = List.empty[T] - val N = ls.size // population - var t = 0 // total input records dealt with - var m = 0 // number of items selected so far - - while (m < num) { - val u = Random.nextDouble() - if ( (N - t)*u < num - m) { - sampled = ls(t) :: sampled - m += 1 - } - t += 1 - } - sampled - } - - // test data - val testLimit = 500000 - val testNum = 10 - val testData = (0 to 1000).toList - - // dummy for warm-up - (0 to testLimit) foreach { n => - randomArraySample(testNum, testData) - shuffleSample(testNum, testData) - rngSample(testNum, testData) - } - - duration("Random Array Sampling") { - (0 to testLimit) foreach { _ => - val sampled = randomArraySample(testNum, testData) - } - } - - duration("Shuffle Sampling") { - (0 to testLimit) foreach { _ => - val sampled = shuffleSample(testNum, testData) - } - } - - duration("RNG Sampling") { - (0 to testLimit) foreach { _ => - val sampled = rngSample(testNum, testData) - } - } - } - - - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/test/controllers/PostProcessSpec.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/controllers/PostProcessSpec.scala b/s2rest_play/test/controllers/PostProcessSpec.scala deleted file mode 100644 index cea132a..0000000 --- a/s2rest_play/test/controllers/PostProcessSpec.scala +++ /dev/null @@ -1,112 +0,0 @@ -package controllers - -import com.kakao.s2graph.core.{OrderingUtil, SeqMultiOrdering} -import play.api.libs.json.{JsNumber, JsString, JsValue} -import play.api.test.PlaySpecification - -class PostProcessSpec extends PlaySpecification { - import OrderingUtil._ - - "test order by json" >> { - val jsLs: Seq[Seq[JsValue]] = Seq( - Seq(JsNumber(0), JsString("a")), - Seq(JsNumber(0), JsString("b")), - Seq(JsNumber(1), JsString("a")), - Seq(JsNumber(1), JsString("b")), - Seq(JsNumber(2), JsString("c")) - ) - - // number descending, string ascending - val sortedJsLs: Seq[Seq[JsValue]] = Seq( - Seq(JsNumber(2), JsString("c")), - Seq(JsNumber(1), JsString("a")), - Seq(JsNumber(1), JsString("b")), - Seq(JsNumber(0), JsString("a")), - Seq(JsNumber(0), JsString("b")) - ) - - val orderParam: Seq[Boolean] = Seq(false, true) - val resultJsLs = jsLs.sorted(new Ordering[Seq[JsValue]] { - override def compare(x: Seq[JsValue], y: Seq[JsValue]): Int = { - val xe = x.iterator - val ye = y.iterator - val oe = orderParam.iterator - - while (xe.hasNext && ye.hasNext && oe.hasNext) { - val (xev, yev) = oe.next() match { - case true => xe.next() -> ye.next() - case false => ye.next() -> xe.next() - } - val res = (xev, yev) match { - case (JsNumber(xv), JsNumber(yv)) => - Ordering[BigDecimal].compare(xv, yv) - case (JsString(xv), JsString(yv)) => - Ordering[String].compare(xv, yv) - case _ => throw new Exception("type mismatch") - } - if (res != 0) return res - } - - Ordering.Boolean.compare(xe.hasNext, ye.hasNext) - } - }) - - resultJsLs.toString() must_== sortedJsLs.toString - } - - "test order by primitive type" >> { - val jsLs: Seq[Seq[Any]] = Seq( - Seq(0, "a"), - Seq(0, "b"), - Seq(1, "a"), - Seq(1, "b"), - Seq(2, "c") - ) - - // number descending, string ascending - val sortedJsLs: Seq[Seq[Any]] = Seq( - Seq(2, "c"), - Seq(1, "a"), - Seq(1, "b"), - Seq(0, "a"), - Seq(0, "b") - ) - - val ascendingLs: Seq[Boolean] = Seq(false, true) - val resultJsLs = jsLs.sorted(new SeqMultiOrdering[Any](ascendingLs)) - - resultJsLs.toString() must_== sortedJsLs.toString - } - - "test order by primitive type with short ascending list" >> { - val jsLs: Seq[Seq[Any]] = Seq( - Seq(0, "a"), - Seq(1, "b"), - Seq(0, "b"), - Seq(1, "a"), - Seq(2, "c"), - Seq(1, "c"), - Seq(1, "d"), - Seq(1, "f"), - Seq(1, "e") - ) - - // number descending, string ascending(default) - val sortedJsLs: Seq[Seq[Any]] = Seq( - Seq(2, "c"), - Seq(1, "a"), - Seq(1, "b"), - Seq(1, "c"), - Seq(1, "d"), - Seq(1, "e"), - Seq(1, "f"), - Seq(0, "a"), - Seq(0, "b") - ) - - val ascendingLs: Seq[Boolean] = Seq(false) - val resultJsLs = jsLs.sorted(new SeqMultiOrdering[Any](ascendingLs)) - - resultJsLs.toString() must_== sortedJsLs.toString - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/BenchmarkCommon.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/BenchmarkCommon.scala b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/BenchmarkCommon.scala new file mode 100644 index 0000000..3d77e16 --- /dev/null +++ b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/BenchmarkCommon.scala @@ -0,0 +1,24 @@ +package org.apache.s2graph.rest.play.benchmark + +import org.specs2.mutable.Specification + +trait BenchmarkCommon extends Specification { + val wrapStr = s"\n==================================================" + + def duration[T](prefix: String = "")(block: => T) = { + val startTs = System.currentTimeMillis() + val ret = block + val endTs = System.currentTimeMillis() + println(s"$wrapStr\n$prefix: took ${endTs - startTs} ms$wrapStr") + ret + } + + def durationWithReturn[T](prefix: String = "")(block: => T): (T, Long) = { + val startTs = System.currentTimeMillis() + val ret = block + val endTs = System.currentTimeMillis() + val duration = endTs - startTs +// println(s"$wrapStr\n$prefix: took $duration ms$wrapStr") + (ret, duration) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/GraphUtilSpec.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/GraphUtilSpec.scala b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/GraphUtilSpec.scala new file mode 100644 index 0000000..52ef4c5 --- /dev/null +++ b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/GraphUtilSpec.scala @@ -0,0 +1,124 @@ +package org.apache.s2graph.rest.play.benchmark + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.GraphUtil +import org.apache.s2graph.core.types.{HBaseType, InnerVal, SourceVertexId} +import play.api.test.{FakeApplication, PlaySpecification} + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +class GraphUtilSpec extends BenchmarkCommon with PlaySpecification { + + def between(bytes: Array[Byte], startKey: Array[Byte], endKey: Array[Byte]): Boolean = + Bytes.compareTo(startKey, bytes) <= 0 && Bytes.compareTo(endKey, bytes) >= 0 + + def betweenShort(value: Short, start: Short, end: Short): Boolean = + start <= value && value <= end + + + "GraphUtil" should { + "test murmur3 hash function distribution" in { + val testNum = 1000000 + val bucketSize = Short.MaxValue / 40 + val countsNew = new mutable.HashMap[Int, Int]() + val counts = new mutable.HashMap[Int, Int]() + for { + i <- (0 until testNum) + } { + val h = GraphUtil.murmur3(i.toString) / bucketSize + val hNew = GraphUtil.murmur3Int(i.toString) / bucketSize + counts += (h -> (counts.getOrElse(h, 0) + 1)) + countsNew += (hNew -> (countsNew.getOrElse(hNew, 0) + 1)) + } + val all = counts.toList.sortBy { case (bucket, count) => count }.reverse + val allNew = countsNew.toList.sortBy { case (bucket, count) => count }.reverse + val top = all.take(10) + val bottom = all.takeRight(10) + val topNew = allNew.take(10) + val bottomNew = allNew.takeRight(10) + println(s"Top: $top") + println(s"Bottom: $bottom") + println("-" * 50) + println(s"TopNew: $topNew") + println(s"Bottom: $bottomNew") + true + } + + "test murmur hash skew2" in { + running(FakeApplication()) { + import HBaseType._ + val testNum = 1000000L + val regionCount = 40 + val window = Int.MaxValue / regionCount + val rangeBytes = new ListBuffer[(List[Byte], List[Byte])]() + for { + i <- (0 until regionCount) + } yield { + val startKey = Bytes.toBytes(i * window) + val endKey = Bytes.toBytes((i + 1) * window) + rangeBytes += (startKey.toList -> endKey.toList) + } + + + + val stats = new collection.mutable.HashMap[Int, ((List[Byte], List[Byte]), Long)]() + val counts = new collection.mutable.HashMap[Short, Long]() + stats += (0 -> (rangeBytes.head -> 0L)) + + for (i <- (0L until testNum)) { + val vertexId = SourceVertexId(DEFAULT_COL_ID, InnerVal.withLong(i, HBaseType.DEFAULT_VERSION)) + val bytes = vertexId.bytes + val shortKey = GraphUtil.murmur3(vertexId.innerId.toIdString()) + val shortVal = counts.getOrElse(shortKey, 0L) + 1L + counts += (shortKey -> shortVal) + var j = 0 + var found = false + while (j < rangeBytes.size && !found) { + val (start, end) = rangeBytes(j) + if (between(bytes, start.toArray, end.toArray)) { + found = true + } + j += 1 + } + val head = rangeBytes(j - 1) + val key = j - 1 + val value = stats.get(key) match { + case None => 0L + case Some(v) => v._2 + 1 + } + stats += (key -> (head, value)) + } + val sorted = stats.toList.sortBy(kv => kv._2._2).reverse + println(s"Index: StartBytes ~ EndBytes\tStartShortBytes ~ EndShortBytes\tStartShort ~ EndShort\tCount\tShortCount") + sorted.foreach { case (idx, ((start, end), cnt)) => + val startShort = Bytes.toShort(start.take(2).toArray) + val endShort = Bytes.toShort(end.take(2).toArray) + val count = counts.count(t => startShort <= t._1 && t._1 < endShort) + println(s"$idx: $start ~ $end\t${start.take(2)} ~ ${end.take(2)}\t$startShort ~ $endShort\t$cnt\t$count") + + } + println("\n" * 10) + println(s"Index: StartBytes ~ EndBytes\tStartShortBytes ~ EndShortBytes\tStartShort ~ EndShort\tCount\tShortCount") + stats.toList.sortBy(kv => kv._1).reverse.foreach { case (idx, ((start, end), cnt)) => + val startShort = Bytes.toShort(start.take(2).toArray) + val endShort = Bytes.toShort(end.take(2).toArray) + val count = counts.count(t => startShort <= t._1 && t._1 < endShort) + println(s"$idx: $start ~ $end\t${start.take(2)} ~ ${end.take(2)}\t$startShort ~ $endShort\t$cnt\t$count") + + } + } + true + } + + "Bytes compareTo" in { + val x = Array[Byte](11, -12, -26, -14, -23) + val startKey = Array[Byte](0, 0, 0, 0) + val endKey = Array[Byte](12, -52, -52, -52) + println(Bytes.compareTo(startKey, x)) + println(Bytes.compareTo(endKey, x)) + true + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/JsonBenchmarkSpec.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/JsonBenchmarkSpec.scala b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/JsonBenchmarkSpec.scala new file mode 100644 index 0000000..ee12b8d --- /dev/null +++ b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/JsonBenchmarkSpec.scala @@ -0,0 +1,45 @@ +package org.apache.s2graph.rest.play.benchmark + +import play.api.libs.json.JsNumber +import play.libs.Json + +class JsonBenchmarkSpec extends BenchmarkCommon { + "to json" >> { + "json benchmark" >> { + + duration("map to json") { + (0 to 10) foreach { n => + val numberMaps = (0 to 100).map { n => (n.toString -> JsNumber(n * n)) }.toMap + Json.toJson(numberMaps) + } + } + + duration("directMakeJson") { + (0 to 10) foreach { n => + var jsObj = play.api.libs.json.Json.obj() + (0 to 10).foreach { n => + jsObj += (n.toString -> JsNumber(n * n)) + } + } + } + + duration("map to json 2") { + (0 to 50) foreach { n => + val numberMaps = (0 to 10).map { n => (n.toString -> JsNumber(n * n)) }.toMap + Json.toJson(numberMaps) + } + } + + duration("directMakeJson 2") { + (0 to 50) foreach { n => + var jsObj = play.api.libs.json.Json.obj() + (0 to 10).foreach { n => + jsObj += (n.toString -> JsNumber(n * n)) + } + } + } + true + } + true + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/OrderingUtilBenchmarkSpec.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/OrderingUtilBenchmarkSpec.scala b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/OrderingUtilBenchmarkSpec.scala new file mode 100644 index 0000000..cc194d7 --- /dev/null +++ b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/OrderingUtilBenchmarkSpec.scala @@ -0,0 +1,98 @@ +package org.apache.s2graph.rest.play.benchmark + +import org.apache.s2graph.core.OrderingUtil._ +import org.apache.s2graph.core.{OrderingUtil, SeqMultiOrdering} +import play.api.libs.json.{JsNumber, JsValue} + +import scala.util.Random + +class OrderingUtilBenchmarkSpec extends BenchmarkCommon { + "OrderingUtilBenchmarkSpec" should { + + "performance MultiOrdering any" >> { + val tupLs = (0 until 10) map { i => + Random.nextDouble() -> Random.nextLong() + } + + val seqLs = tupLs.map { tup => + Seq(tup._1, tup._2) + } + + val sorted1 = duration("TupleOrdering double,long") { + (0 until 1000) foreach { _ => + tupLs.sortBy { case (x, y) => + -x -> -y + } + } + tupLs.sortBy { case (x, y) => + -x -> -y + } + }.map { x => x._1 } + + val sorted2 = duration("MultiOrdering double,long") { + (0 until 1000) foreach { _ => + seqLs.sorted(new SeqMultiOrdering[Any](Seq(false, false))) + } + seqLs.sorted(new SeqMultiOrdering[Any](Seq(false, false))) + }.map { x => x.head } + + sorted1.toString() must_== sorted2.toString() + } + + "performance MultiOrdering double" >> { + val tupLs = (0 until 50) map { i => + Random.nextDouble() -> Random.nextDouble() + } + + val seqLs = tupLs.map { tup => + Seq(tup._1, tup._2) + } + + duration("MultiOrdering double") { + (0 until 1000) foreach { _ => + seqLs.sorted(new SeqMultiOrdering[Double](Seq(false, false))) + } + } + + duration("TupleOrdering double") { + (0 until 1000) foreach { _ => + tupLs.sortBy { case (x, y) => + -x -> -y + } + } + } + + 1 must_== 1 + } + + "performance MultiOrdering jsvalue" >> { + val tupLs = (0 until 50) map { i => + Random.nextDouble() -> Random.nextLong() + } + + val seqLs = tupLs.map { tup => + Seq(JsNumber(tup._1), JsNumber(tup._2)) + } + + val sorted1 = duration("TupleOrdering double,long") { + (0 until 1000) foreach { _ => + tupLs.sortBy { case (x, y) => + -x -> -y + } + } + tupLs.sortBy { case (x, y) => + -x -> -y + } + } + + val sorted2 = duration("MultiOrdering jsvalue") { + (0 until 1000) foreach { _ => + seqLs.sorted(new SeqMultiOrdering[JsValue](Seq(false, false))) + } + seqLs.sorted(new SeqMultiOrdering[JsValue](Seq(false, false))) + } + + 1 must_== 1 + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/SamplingBenchmarkSpec.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/SamplingBenchmarkSpec.scala b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/SamplingBenchmarkSpec.scala new file mode 100644 index 0000000..2db95e4 --- /dev/null +++ b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/SamplingBenchmarkSpec.scala @@ -0,0 +1,87 @@ +package org.apache.s2graph.rest.play.benchmark + +import play.api.test.{FakeApplication, PlaySpecification, WithApplication} + +import scala.annotation.tailrec +import scala.util.Random + +class SamplingBenchmarkSpec extends BenchmarkCommon with PlaySpecification { + "sample" should { + implicit val app = FakeApplication() + + "sample benchmark" in new WithApplication(app) { + @tailrec + def randomInt(n: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = { + if (set.size == n) set + else randomInt(n, range, set + Random.nextInt(range)) + } + + // sample using random array + def randomArraySample[T](num: Int, ls: List[T]): List[T] = { + val randomNum = randomInt(num, ls.size) + var sample = List.empty[T] + var idx = 0 + ls.foreach { e => + if (randomNum.contains(idx)) sample = e :: sample + idx += 1 + } + sample + } + + // sample using shuffle + def shuffleSample[T](num: Int, ls: List[T]): List[T] = { + Random.shuffle(ls).take(num) + } + + // sample using random number generation + def rngSample[T](num: Int, ls: List[T]): List[T] = { + var sampled = List.empty[T] + val N = ls.size // population + var t = 0 // total input records dealt with + var m = 0 // number of items selected so far + + while (m < num) { + val u = Random.nextDouble() + if ( (N - t)*u < num - m) { + sampled = ls(t) :: sampled + m += 1 + } + t += 1 + } + sampled + } + + // test data + val testLimit = 500000 + val testNum = 10 + val testData = (0 to 1000).toList + + // dummy for warm-up + (0 to testLimit) foreach { n => + randomArraySample(testNum, testData) + shuffleSample(testNum, testData) + rngSample(testNum, testData) + } + + duration("Random Array Sampling") { + (0 to testLimit) foreach { _ => + val sampled = randomArraySample(testNum, testData) + } + } + + duration("Shuffle Sampling") { + (0 to testLimit) foreach { _ => + val sampled = shuffleSample(testNum, testData) + } + } + + duration("RNG Sampling") { + (0 to testLimit) foreach { _ => + val sampled = rngSample(testNum, testData) + } + } + } + + + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/test/org/apache/s2graph/rest/play/controllers/PostProcessSpec.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/org/apache/s2graph/rest/play/controllers/PostProcessSpec.scala b/s2rest_play/test/org/apache/s2graph/rest/play/controllers/PostProcessSpec.scala new file mode 100644 index 0000000..998aef8 --- /dev/null +++ b/s2rest_play/test/org/apache/s2graph/rest/play/controllers/PostProcessSpec.scala @@ -0,0 +1,112 @@ +package org.apache.s2graph.rest.play.controllers + +import org.apache.s2graph.core.{OrderingUtil, SeqMultiOrdering} +import play.api.libs.json.{JsNumber, JsString, JsValue} +import play.api.test.PlaySpecification + +class PostProcessSpec extends PlaySpecification { + import OrderingUtil._ + + "test order by json" >> { + val jsLs: Seq[Seq[JsValue]] = Seq( + Seq(JsNumber(0), JsString("a")), + Seq(JsNumber(0), JsString("b")), + Seq(JsNumber(1), JsString("a")), + Seq(JsNumber(1), JsString("b")), + Seq(JsNumber(2), JsString("c")) + ) + + // number descending, string ascending + val sortedJsLs: Seq[Seq[JsValue]] = Seq( + Seq(JsNumber(2), JsString("c")), + Seq(JsNumber(1), JsString("a")), + Seq(JsNumber(1), JsString("b")), + Seq(JsNumber(0), JsString("a")), + Seq(JsNumber(0), JsString("b")) + ) + + val orderParam: Seq[Boolean] = Seq(false, true) + val resultJsLs = jsLs.sorted(new Ordering[Seq[JsValue]] { + override def compare(x: Seq[JsValue], y: Seq[JsValue]): Int = { + val xe = x.iterator + val ye = y.iterator + val oe = orderParam.iterator + + while (xe.hasNext && ye.hasNext && oe.hasNext) { + val (xev, yev) = oe.next() match { + case true => xe.next() -> ye.next() + case false => ye.next() -> xe.next() + } + val res = (xev, yev) match { + case (JsNumber(xv), JsNumber(yv)) => + Ordering[BigDecimal].compare(xv, yv) + case (JsString(xv), JsString(yv)) => + Ordering[String].compare(xv, yv) + case _ => throw new Exception("type mismatch") + } + if (res != 0) return res + } + + Ordering.Boolean.compare(xe.hasNext, ye.hasNext) + } + }) + + resultJsLs.toString() must_== sortedJsLs.toString + } + + "test order by primitive type" >> { + val jsLs: Seq[Seq[Any]] = Seq( + Seq(0, "a"), + Seq(0, "b"), + Seq(1, "a"), + Seq(1, "b"), + Seq(2, "c") + ) + + // number descending, string ascending + val sortedJsLs: Seq[Seq[Any]] = Seq( + Seq(2, "c"), + Seq(1, "a"), + Seq(1, "b"), + Seq(0, "a"), + Seq(0, "b") + ) + + val ascendingLs: Seq[Boolean] = Seq(false, true) + val resultJsLs = jsLs.sorted(new SeqMultiOrdering[Any](ascendingLs)) + + resultJsLs.toString() must_== sortedJsLs.toString + } + + "test order by primitive type with short ascending list" >> { + val jsLs: Seq[Seq[Any]] = Seq( + Seq(0, "a"), + Seq(1, "b"), + Seq(0, "b"), + Seq(1, "a"), + Seq(2, "c"), + Seq(1, "c"), + Seq(1, "d"), + Seq(1, "f"), + Seq(1, "e") + ) + + // number descending, string ascending(default) + val sortedJsLs: Seq[Seq[Any]] = Seq( + Seq(2, "c"), + Seq(1, "a"), + Seq(1, "b"), + Seq(1, "c"), + Seq(1, "d"), + Seq(1, "e"), + Seq(1, "f"), + Seq(0, "a"), + Seq(0, "b") + ) + + val ascendingLs: Seq[Boolean] = Seq(false) + val resultJsLs = jsLs.sorted(new SeqMultiOrdering[Any](ascendingLs)) + + resultJsLs.toString() must_== sortedJsLs.toString + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/spark/src/main/scala/org/apache/s2graph/spark/config/S2ConfigFactory.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/s2graph/spark/config/S2ConfigFactory.scala b/spark/src/main/scala/org/apache/s2graph/spark/config/S2ConfigFactory.scala new file mode 100644 index 0000000..99d3ba4 --- /dev/null +++ b/spark/src/main/scala/org/apache/s2graph/spark/config/S2ConfigFactory.scala @@ -0,0 +1,20 @@ +package org.apache.s2graph.spark.config + +import com.typesafe.config.{Config, ConfigFactory} + +object S2ConfigFactory { + lazy val config: Config = _load + + @deprecated("do not call explicitly. use config", "0.0.6") + def load(): Config = { + _load + } + + def _load: Config = { + // default configuration file name : application.conf + val sysConfig = ConfigFactory.parseProperties(System.getProperties) + + lazy val phase = if (!sysConfig.hasPath("phase")) "alpha" else sysConfig.getString("phase") + sysConfig.withFallback(ConfigFactory.parseResourcesAnySyntax(s"$phase.conf")).withFallback(ConfigFactory.load()) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/spark/src/main/scala/org/apache/s2graph/spark/spark/HashMapParam.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/s2graph/spark/spark/HashMapParam.scala b/spark/src/main/scala/org/apache/s2graph/spark/spark/HashMapParam.scala new file mode 100644 index 0000000..b8e73ca --- /dev/null +++ b/spark/src/main/scala/org/apache/s2graph/spark/spark/HashMapParam.scala @@ -0,0 +1,55 @@ +package org.apache.s2graph.spark.spark + +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.{AccumulableParam, SparkConf} + +import scala.collection.mutable.{HashMap => MutableHashMap} + +/* + * Allows a mutable HashMap[String, Int] to be used as an accumulator in Spark. + * Whenever we try to put (k, v2) into an accumulator that already contains (k, v1), the result + * will be a HashMap containing (k, v1 + v2). + * + * Would have been nice to extend GrowableAccumulableParam instead of redefining everything, but it's + * private to the spark package. + */ + +class HashMapParam[K, V](op: (V, V) => V) extends AccumulableParam[MutableHashMap[K, V], (K, V)] { + type MapType = MutableHashMap[K, V] + type ElemType = (K, V) + + def addAccumulator(acc: MapType, elem: ElemType): MapType = { + val (k1, v1) = elem + acc += acc.find(_._1 == k1).map { + case (k2, v2) => k2 -> op(v1, v2) + }.getOrElse(elem) + + acc + } + + /* + * This method is allowed to modify and return the first value for efficiency. + * + * @see org.apache.spark.GrowableAccumulableParam.addInPlace(r1: R, r2: R): R + */ + def addInPlace(acc1: MapType, acc2: MapType): MapType = { + acc2.foreach(elem => addAccumulator(acc1, elem)) + acc1 + } + + /* + * @see org.apache.spark.GrowableAccumulableParam.zero(initialValue: R): R + */ + def zero(initialValue: MapType): MapType = { + val ser = new JavaSerializer(new SparkConf(false)).newInstance() + val copy = ser.deserialize[MapType](ser.serialize(initialValue)) + copy.clear() + copy + } +} + +object HashMapParam { + def apply[K, V](op: (V, V) => V): HashMapParam[K, V] = { + new HashMapParam[K, V](op) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/spark/src/main/scala/org/apache/s2graph/spark/spark/RDDUtil.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/s2graph/spark/spark/RDDUtil.scala b/spark/src/main/scala/org/apache/s2graph/spark/spark/RDDUtil.scala new file mode 100644 index 0000000..651d78b --- /dev/null +++ b/spark/src/main/scala/org/apache/s2graph/spark/spark/RDDUtil.scala @@ -0,0 +1,13 @@ +package org.apache.s2graph.spark.spark + +import org.apache.spark.rdd.RDD + +object RDDUtil { + def rddIsNonEmpty[T](rdd: RDD[T]): Boolean = { + !rddIsEmpty(rdd) + } + + def rddIsEmpty[T](rdd: RDD[T]): Boolean = { + rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_ && _) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/spark/src/main/scala/org/apache/s2graph/spark/spark/SparkApp.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/s2graph/spark/spark/SparkApp.scala b/spark/src/main/scala/org/apache/s2graph/spark/spark/SparkApp.scala new file mode 100644 index 0000000..2e11904 --- /dev/null +++ b/spark/src/main/scala/org/apache/s2graph/spark/spark/SparkApp.scala @@ -0,0 +1,120 @@ +package org.apache.s2graph.spark.spark + +import kafka.serializer.StringDecoder +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.kafka.{KafkaUtils, StreamHelper} +import org.apache.spark.streaming.{Duration, StreamingContext} +import org.apache.spark.{Accumulable, Logging, SparkConf} + +import scala.collection.mutable.{HashMap => MutableHashMap} + +trait SparkApp extends Logging { + type HashMapAccumulable = Accumulable[MutableHashMap[String, Long], (String, Long)] + + protected def args: Array[String] = _args + + private var _args: Array[String] = _ + + private var streamHelper: StreamHelper = _ + + // should implement in derived class + def run() + + def getArgs(index: Int) = args(index) + + def main(args: Array[String]) { + _args = args + run() + } + + def validateArgument(argNames: String*): Unit = { + if (args == null || args.length < argNames.length) { + System.err.println(s"Usage: ${getClass.getName} " + argNames.map(s => s"<$s>").mkString(" ")) + System.exit(1) + } + } + + def buildKafkaGroupId(topic: String, ext: String): String = { + val phase = System.getProperty("phase") + + // use first topic for group id + val groupId = s"${topic.split(',')(0)}_$ext" + + groupId + { + phase match { + case "real" | "production" => "" + case x => s"_$x" + } + } + } + + def getStreamHelper(kafkaParam: Map[String, String]): StreamHelper = { + if (streamHelper == null) { + this.synchronized { + if (streamHelper == null) { + streamHelper = StreamHelper(kafkaParam) + } + } + } + streamHelper + } + + def sparkConf(jobName: String): SparkConf = { + val conf = new SparkConf() + conf.setAppName(jobName) + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.streaming.unpersist", "true") + conf + } + + def streamingContext(sparkConf: SparkConf, interval: Duration, checkPoint: Option[String] = None) = { + val ssc = new StreamingContext(sparkConf, interval) + checkPoint.foreach { dir => + ssc.checkpoint(dir) + } + + // for watch tower + ssc.addStreamingListener(new SubscriberListener(ssc)) + + ssc + } + + def createKafkaPairStream(ssc: StreamingContext, kafkaParam: Map[String, String], topics: String, numPartition: Option[Int] = None): DStream[(String, String)] = { + val topicMap = topics.split(",").map((_, 1)).toMap + val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2) + numPartition.map(n => + stream.repartition(n) + ).getOrElse(stream) + } + + def createKafkaValueStream(ssc: StreamingContext, kafkaParam: Map[String, String], topics: String, numPartition: Option[Int] = None): DStream[String] = { + createKafkaPairStream(ssc, kafkaParam, topics, numPartition).map(_._2) + } + + def createKafkaPairStreamMulti(ssc: StreamingContext, kafkaParam: Map[String, String], topics: String, receiverCount: Int, numPartition: Option[Int] = None): DStream[(String, String)] = { + // wait until all executor is running + Stream.continually(ssc.sparkContext.getExecutorStorageStatus).takeWhile(_.length < receiverCount).foreach { arr => + Thread.sleep(100) + } + Thread.sleep(1000) + + val topicMap = topics.split(",").map((_, 1)).toMap + + val stream = { + val streams = { + (1 to receiverCount) map { _ => + KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2) + } + } + ssc.union(streams) + } + numPartition.map(n => + stream.repartition(n) + ).getOrElse(stream) + } + + def createKafkaValueStreamMulti(ssc: StreamingContext, kafkaParam: Map[String, String], topics: String, receiverCount: Int, numPartition: Option[Int] = None): DStream[String] = { + createKafkaPairStreamMulti(ssc, kafkaParam, topics, receiverCount, numPartition).map(_._2) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/spark/src/main/scala/org/apache/s2graph/spark/spark/SubscriberListener.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/s2graph/spark/spark/SubscriberListener.scala b/spark/src/main/scala/org/apache/s2graph/spark/spark/SubscriberListener.scala new file mode 100644 index 0000000..f23debb --- /dev/null +++ b/spark/src/main/scala/org/apache/s2graph/spark/spark/SubscriberListener.scala @@ -0,0 +1,20 @@ +package org.apache.s2graph.spark.spark + +import org.apache.spark.Logging +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerReceiverError, StreamingListenerReceiverStarted, StreamingListenerReceiverStopped} + +class SubscriberListener(ssc: StreamingContext) extends StreamingListener with Logging { + override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = { + logInfo("onReceiverError") + } + + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { + logInfo("onReceiverStarted") + } + + override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = { + logInfo("onReceiverStopped") + ssc.stop() + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/spark/src/main/scala/org/apache/s2graph/spark/spark/WithKafka.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/s2graph/spark/spark/WithKafka.scala b/spark/src/main/scala/org/apache/s2graph/spark/spark/WithKafka.scala new file mode 100644 index 0000000..f28b9cf --- /dev/null +++ b/spark/src/main/scala/org/apache/s2graph/spark/spark/WithKafka.scala @@ -0,0 +1,69 @@ +package org.apache.s2graph.spark.spark + +import java.util.Properties + +import kafka.producer.{Producer, ProducerConfig} + +trait WithKafka { + def kafkaConf(brokerList: String) = { + val props = new Properties() + props.put("metadata.broker.list", brokerList) + props.put("request.required.acks", "0") + props.put("producer.type", "async") + props.put("serializer.class", "kafka.serializer.StringEncoder") + props.put("compression.codec", "1") + props.put("message.send.max.retries", "3") + props.put("batch.num.messages", "1000") + new ProducerConfig(props) + } + + def producerConfig(brokerList: String, requireAcks: String = "1", producerType: String = "sync") = { + val props = new Properties() + props.setProperty("metadata.broker.list", brokerList) + props.setProperty("request.required.acks", requireAcks) + props.setProperty("producer.type", producerType) + props.setProperty("serializer.class", "kafka.serializer.StringEncoder") + props.setProperty("compression.codec", "snappy") + props.setProperty("message.send.max.retries", "1") + new ProducerConfig(props) + } + + def getProducer[K, V](config: ProducerConfig): Producer[K, V] = { + new Producer[K, V](config) + } + + def getProducer[K, V](brokers: String): Producer[K, V] = { + getProducer(producerConfig(brokers)) + } + + /** + * Kafka DefaultPartitioner + * @param k + * @param n + * @return + */ + def getPartKey(k: Any, n: Int): Int = { + kafka.utils.Utils.abs(k.hashCode()) % n + } + + def makeKafkaGroupId(topic: String, ext: String): String = { + val phase = System.getProperty("phase") + + var groupId = s"${topic}_$ext" + + groupId += { + System.getProperty("spark.master") match { + case x if x.startsWith("local") => "_local" + case _ => "" + } + } + + groupId += { + phase match { + case "alpha" => "_alpha" + case _ => "" + }} + + groupId + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/spark/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDFunctions.scala b/spark/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDFunctions.scala index 56be543..768bedb 100644 --- a/spark/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDFunctions.scala +++ b/spark/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDFunctions.scala @@ -6,9 +6,6 @@ import org.apache.spark.rdd.RDD import scala.language.implicitConversions import scala.reflect.ClassTag -/** - * Created by hsleep([email protected]) on 15. 5. 6.. - */ class KafkaRDDFunctions[T: ClassTag](self: RDD[T]) extends Logging with Serializable http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/spark/src/main/scala/org/apache/spark/streaming/kafka/StreamHelper.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/spark/streaming/kafka/StreamHelper.scala b/spark/src/main/scala/org/apache/spark/streaming/kafka/StreamHelper.scala index 782f87f..0baf397 100644 --- a/spark/src/main/scala/org/apache/spark/streaming/kafka/StreamHelper.scala +++ b/spark/src/main/scala/org/apache/spark/streaming/kafka/StreamHelper.scala @@ -8,12 +8,8 @@ import kafka.serializer.Decoder import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.{Logging, SparkException} - import scala.reflect.ClassTag -/** - * Created by hsleep([email protected]) on 15. 4. 22.. - */ case class StreamHelper(kafkaParams: Map[String, String]) extends Logging { // helper for kafka zookeeper lazy val kafkaHelper = KafkaHelper(kafkaParams) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/spark/src/main/scala/s2/config/S2ConfigFactory.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/s2/config/S2ConfigFactory.scala b/spark/src/main/scala/s2/config/S2ConfigFactory.scala deleted file mode 100644 index 7666cdc..0000000 --- a/spark/src/main/scala/s2/config/S2ConfigFactory.scala +++ /dev/null @@ -1,27 +0,0 @@ -package s2.config - -import com.typesafe.config.{Config, ConfigFactory} - -/** - * Created by alec on 15. 3. 4.. - */ - -/** - * phaseì ë°ë¼ phase.conf íì¼ì load í´ì£¼ë config factory - */ -object S2ConfigFactory { - lazy val config: Config = _load - - @deprecated("do not call explicitly. use config", "0.0.6") - def load(): Config = { - _load - } - - def _load: Config = { - // default configuration file name : application.conf - val sysConfig = ConfigFactory.parseProperties(System.getProperties) - - lazy val phase = if (!sysConfig.hasPath("phase")) "alpha" else sysConfig.getString("phase") - sysConfig.withFallback(ConfigFactory.parseResourcesAnySyntax(s"$phase.conf")).withFallback(ConfigFactory.load()) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/spark/src/main/scala/s2/spark/HashMapParam.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/s2/spark/HashMapParam.scala b/spark/src/main/scala/s2/spark/HashMapParam.scala deleted file mode 100644 index a84c687..0000000 --- a/spark/src/main/scala/s2/spark/HashMapParam.scala +++ /dev/null @@ -1,55 +0,0 @@ -package s2.spark - -import org.apache.spark.serializer.JavaSerializer -import org.apache.spark.{AccumulableParam, SparkConf} - -import scala.collection.mutable.{HashMap => MutableHashMap} - -/* - * Allows a mutable HashMap[String, Int] to be used as an accumulator in Spark. - * Whenever we try to put (k, v2) into an accumulator that already contains (k, v1), the result - * will be a HashMap containing (k, v1 + v2). - * - * Would have been nice to extend GrowableAccumulableParam instead of redefining everything, but it's - * private to the spark package. - */ - -class HashMapParam[K, V](op: (V, V) => V) extends AccumulableParam[MutableHashMap[K, V], (K, V)] { - type MapType = MutableHashMap[K, V] - type ElemType = (K, V) - - def addAccumulator(acc: MapType, elem: ElemType): MapType = { - val (k1, v1) = elem - acc += acc.find(_._1 == k1).map { - case (k2, v2) => k2 -> op(v1, v2) - }.getOrElse(elem) - - acc - } - - /* - * This method is allowed to modify and return the first value for efficiency. - * - * @see org.apache.spark.GrowableAccumulableParam.addInPlace(r1: R, r2: R): R - */ - def addInPlace(acc1: MapType, acc2: MapType): MapType = { - acc2.foreach(elem => addAccumulator(acc1, elem)) - acc1 - } - - /* - * @see org.apache.spark.GrowableAccumulableParam.zero(initialValue: R): R - */ - def zero(initialValue: MapType): MapType = { - val ser = new JavaSerializer(new SparkConf(false)).newInstance() - val copy = ser.deserialize[MapType](ser.serialize(initialValue)) - copy.clear() - copy - } -} - -object HashMapParam { - def apply[K, V](op: (V, V) => V): HashMapParam[K, V] = { - new HashMapParam[K, V](op) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/spark/src/main/scala/s2/spark/RDDUtil.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/s2/spark/RDDUtil.scala b/spark/src/main/scala/s2/spark/RDDUtil.scala deleted file mode 100644 index c5a371a..0000000 --- a/spark/src/main/scala/s2/spark/RDDUtil.scala +++ /dev/null @@ -1,16 +0,0 @@ -package s2.spark - -import org.apache.spark.rdd.RDD - -/** - * Created by hsleep([email protected]) on 14. 12. 23.. - */ -object RDDUtil { - def rddIsNonEmpty[T](rdd: RDD[T]): Boolean = { - !rddIsEmpty(rdd) - } - - def rddIsEmpty[T](rdd: RDD[T]): Boolean = { - rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_ && _) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/spark/src/main/scala/s2/spark/SparkApp.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/s2/spark/SparkApp.scala b/spark/src/main/scala/s2/spark/SparkApp.scala deleted file mode 100644 index e27b7ec..0000000 --- a/spark/src/main/scala/s2/spark/SparkApp.scala +++ /dev/null @@ -1,124 +0,0 @@ -package s2.spark - -import kafka.serializer.StringDecoder -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.dstream.DStream -import org.apache.spark.streaming.kafka.{KafkaUtils, StreamHelper} -import org.apache.spark.streaming.{Duration, StreamingContext} -import org.apache.spark.{Accumulable, Logging, SparkConf} - -import scala.collection.mutable.{HashMap => MutableHashMap} - - -/** - * Created by hsleep([email protected]) on 14. 12. 26.. - */ -trait SparkApp extends Logging { - type HashMapAccumulable = Accumulable[MutableHashMap[String, Long], (String, Long)] - - protected def args: Array[String] = _args - - private var _args: Array[String] = _ - - private var streamHelper: StreamHelper = _ - - // should implement in derived class - def run() - - def getArgs(index: Int) = args(index) - - def main(args: Array[String]) { - _args = args - run() - } - - def validateArgument(argNames: String*): Unit = { - if (args == null || args.length < argNames.length) { - System.err.println(s"Usage: ${getClass.getName} " + argNames.map(s => s"<$s>").mkString(" ")) - System.exit(1) - } - } - - def buildKafkaGroupId(topic: String, ext: String): String = { - val phase = System.getProperty("phase") - - // use first topic for group id - val groupId = s"${topic.split(',')(0)}_$ext" - - groupId + { - phase match { - case "real" | "production" => "" - case x => s"_$x" - } - } - } - - def getStreamHelper(kafkaParam: Map[String, String]): StreamHelper = { - if (streamHelper == null) { - this.synchronized { - if (streamHelper == null) { - streamHelper = StreamHelper(kafkaParam) - } - } - } - streamHelper - } - - def sparkConf(jobName: String): SparkConf = { - val conf = new SparkConf() - conf.setAppName(jobName) - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - conf.set("spark.streaming.unpersist", "true") - conf - } - - def streamingContext(sparkConf: SparkConf, interval: Duration, checkPoint: Option[String] = None) = { - val ssc = new StreamingContext(sparkConf, interval) - checkPoint.foreach { dir => - ssc.checkpoint(dir) - } - - // for watch tower - ssc.addStreamingListener(new SubscriberListener(ssc)) - - ssc - } - - def createKafkaPairStream(ssc: StreamingContext, kafkaParam: Map[String, String], topics: String, numPartition: Option[Int] = None): DStream[(String, String)] = { - val topicMap = topics.split(",").map((_, 1)).toMap - val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2) - numPartition.map(n => - stream.repartition(n) - ).getOrElse(stream) - } - - def createKafkaValueStream(ssc: StreamingContext, kafkaParam: Map[String, String], topics: String, numPartition: Option[Int] = None): DStream[String] = { - createKafkaPairStream(ssc, kafkaParam, topics, numPartition).map(_._2) - } - - def createKafkaPairStreamMulti(ssc: StreamingContext, kafkaParam: Map[String, String], topics: String, receiverCount: Int, numPartition: Option[Int] = None): DStream[(String, String)] = { - // wait until all executor is running - Stream.continually(ssc.sparkContext.getExecutorStorageStatus).takeWhile(_.length < receiverCount).foreach { arr => - Thread.sleep(100) - } - Thread.sleep(1000) - - val topicMap = topics.split(",").map((_, 1)).toMap - - val stream = { - val streams = { - (1 to receiverCount) map { _ => - KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2) - } - } - ssc.union(streams) - } - numPartition.map(n => - stream.repartition(n) - ).getOrElse(stream) - } - - def createKafkaValueStreamMulti(ssc: StreamingContext, kafkaParam: Map[String, String], topics: String, receiverCount: Int, numPartition: Option[Int] = None): DStream[String] = { - createKafkaPairStreamMulti(ssc, kafkaParam, topics, receiverCount, numPartition).map(_._2) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/spark/src/main/scala/s2/spark/SubscriberListener.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/s2/spark/SubscriberListener.scala b/spark/src/main/scala/s2/spark/SubscriberListener.scala deleted file mode 100644 index 9e156a7..0000000 --- a/spark/src/main/scala/s2/spark/SubscriberListener.scala +++ /dev/null @@ -1,24 +0,0 @@ -package s2.spark - -import org.apache.spark.Logging -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerReceiverError, StreamingListenerReceiverStarted, StreamingListenerReceiverStopped} - -/** - * Created by hsleep([email protected]) on 15. 1. 8.. - */ - -class SubscriberListener(ssc: StreamingContext) extends StreamingListener with Logging { - override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = { - logInfo("onReceiverError") - } - - override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { - logInfo("onReceiverStarted") - } - - override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = { - logInfo("onReceiverStopped") - ssc.stop() - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/spark/src/main/scala/s2/spark/WithKafka.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/s2/spark/WithKafka.scala b/spark/src/main/scala/s2/spark/WithKafka.scala deleted file mode 100644 index 9bd5944..0000000 --- a/spark/src/main/scala/s2/spark/WithKafka.scala +++ /dev/null @@ -1,69 +0,0 @@ -package s2.spark - -import java.util.Properties - -import kafka.producer.{Producer, ProducerConfig} - -trait WithKafka { - def kafkaConf(brokerList: String) = { - val props = new Properties() - props.put("metadata.broker.list", brokerList) - props.put("request.required.acks", "0") - props.put("producer.type", "async") - props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("compression.codec", "1") - props.put("message.send.max.retries", "3") - props.put("batch.num.messages", "1000") - new ProducerConfig(props) - } - - def producerConfig(brokerList: String, requireAcks: String = "1", producerType: String = "sync") = { - val props = new Properties() - props.setProperty("metadata.broker.list", brokerList) - props.setProperty("request.required.acks", requireAcks) - props.setProperty("producer.type", producerType) - props.setProperty("serializer.class", "kafka.serializer.StringEncoder") - props.setProperty("compression.codec", "snappy") - props.setProperty("message.send.max.retries", "1") - new ProducerConfig(props) - } - - def getProducer[K, V](config: ProducerConfig): Producer[K, V] = { - new Producer[K, V](config) - } - - def getProducer[K, V](brokers: String): Producer[K, V] = { - getProducer(producerConfig(brokers)) - } - - /** - * Kafka DefaultPartitioner - * @param k - * @param n - * @return - */ - def getPartKey(k: Any, n: Int): Int = { - kafka.utils.Utils.abs(k.hashCode()) % n - } - - def makeKafkaGroupId(topic: String, ext: String): String = { - val phase = System.getProperty("phase") - - var groupId = s"${topic}_$ext" - - groupId += { - System.getProperty("spark.master") match { - case x if x.startsWith("local") => "_local" - case _ => "" - } - } - - groupId += { - phase match { - case "alpha" => "_alpha" - case _ => "" - }} - - groupId - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/spark/src/test/scala/org/apache/s2graph/spark/SparkAppTest.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/s2graph/spark/SparkAppTest.scala b/spark/src/test/scala/org/apache/s2graph/spark/SparkAppTest.scala new file mode 100644 index 0000000..2d48a89 --- /dev/null +++ b/spark/src/test/scala/org/apache/s2graph/spark/SparkAppTest.scala @@ -0,0 +1,17 @@ +package org.apache.s2graph.spark + +import org.apache.s2graph.spark.spark.SparkApp +import org.scalatest.{FunSuite, Matchers} + +object TestApp extends SparkApp { + override def run(): Unit = { + validateArgument("topic", "phase") + } +} + +class SparkAppTest extends FunSuite with Matchers { + test("parse argument") { + TestApp.main(Array("s2graphInreal", "real")) + TestApp.getArgs(0) shouldEqual "s2graphInreal" + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/spark/src/test/scala/org/apache/s2graph/spark/TestStreamingSpec.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/s2graph/spark/TestStreamingSpec.scala b/spark/src/test/scala/org/apache/s2graph/spark/TestStreamingSpec.scala new file mode 100644 index 0000000..df6c7e5 --- /dev/null +++ b/spark/src/test/scala/org/apache/s2graph/spark/TestStreamingSpec.scala @@ -0,0 +1,31 @@ +package org.apache.s2graph.spark + +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.{SparkConf, SparkContext} +import org.specs2.mutable.Specification +import org.specs2.specification.BeforeAfterAll + +class TestStreamingSpec extends Specification with BeforeAfterAll { + private val master = "local[2]" + private val appName = "test_streaming" + private val batchDuration = Seconds(1) + + private var sc: SparkContext = _ + private var ssc: StreamingContext = _ + + override def beforeAll(): Unit = { + val conf = new SparkConf() + .setMaster(master) + .setAppName(appName) + + ssc = new StreamingContext(conf, batchDuration) + + sc = ssc.sparkContext + } + + override def afterAll(): Unit = { + if (ssc != null) { + ssc.stop() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/spark/src/test/scala/s2/spark/SparkAppTest.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/s2/spark/SparkAppTest.scala b/spark/src/test/scala/s2/spark/SparkAppTest.scala deleted file mode 100644 index 6bf2482..0000000 --- a/spark/src/test/scala/s2/spark/SparkAppTest.scala +++ /dev/null @@ -1,21 +0,0 @@ -package s2.spark - -import org.scalatest.{FunSuite, Matchers} - -/** - * Created by alec.k on 14. 12. 26.. - */ - -object TestApp extends SparkApp { - // ììë°ì í´ëì¤ìì 구íí´ì¤ì¼ íë í¨ì - override def run(): Unit = { - validateArgument("topic", "phase") - } -} - -class SparkAppTest extends FunSuite with Matchers { - test("parse argument") { - TestApp.main(Array("s2graphInreal", "real")) - TestApp.getArgs(0) shouldEqual "s2graphInreal" - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/spark/src/test/scala/s2/spark/TestStreamingSpec.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/s2/spark/TestStreamingSpec.scala b/spark/src/test/scala/s2/spark/TestStreamingSpec.scala deleted file mode 100644 index bf15618..0000000 --- a/spark/src/test/scala/s2/spark/TestStreamingSpec.scala +++ /dev/null @@ -1,34 +0,0 @@ -package s2.spark - -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.{SparkConf, SparkContext} -import org.specs2.mutable.Specification -import org.specs2.specification.BeforeAfterAll - -/** - * Created by hsleep([email protected]) on 15. 6. 17.. - */ -class TestStreamingSpec extends Specification with BeforeAfterAll { - private val master = "local[2]" - private val appName = "test_streaming" - private val batchDuration = Seconds(1) - - private var sc: SparkContext = _ - private var ssc: StreamingContext = _ - - override def beforeAll(): Unit = { - val conf = new SparkConf() - .setMaster(master) - .setAppName(appName) - - ssc = new StreamingContext(conf, batchDuration) - - sc = ssc.sparkContext - } - - override def afterAll(): Unit = { - if (ssc != null) { - ssc.stop() - } - } -}
