Repository: incubator-s2graph
Updated Branches:
  refs/heads/master b6fe32fc2 -> b8a152176 (forced update)


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/test/benchmark/JsonBenchmarkSpec.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/test/benchmark/JsonBenchmarkSpec.scala 
b/s2rest_play/test/benchmark/JsonBenchmarkSpec.scala
deleted file mode 100644
index a387ba5..0000000
--- a/s2rest_play/test/benchmark/JsonBenchmarkSpec.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-package benchmark
-
-import play.api.libs.json.JsNumber
-import play.api.test.{FakeApplication, PlaySpecification, WithApplication}
-import play.libs.Json
-
-class JsonBenchmarkSpec extends BenchmarkCommon {
-  "to json" >> {
-    "json benchmark" >> {
-
-      duration("map to json") {
-        (0 to 10) foreach { n =>
-          val numberMaps = (0 to 100).map { n => (n.toString -> JsNumber(n * 
n)) }.toMap
-          Json.toJson(numberMaps)
-        }
-      }
-
-      duration("directMakeJson") {
-        (0 to 10) foreach { n =>
-          var jsObj = play.api.libs.json.Json.obj()
-          (0 to 10).foreach { n =>
-            jsObj += (n.toString -> JsNumber(n * n))
-          }
-        }
-      }
-
-      duration("map to json 2") {
-        (0 to 50) foreach { n =>
-          val numberMaps = (0 to 10).map { n => (n.toString -> JsNumber(n * 
n)) }.toMap
-          Json.toJson(numberMaps)
-        }
-      }
-
-      duration("directMakeJson 2") {
-        (0 to 50) foreach { n =>
-          var jsObj = play.api.libs.json.Json.obj()
-          (0 to 10).foreach { n =>
-            jsObj += (n.toString -> JsNumber(n * n))
-          }
-        }
-      }
-      true
-    }
-    true
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/test/benchmark/OrderingUtilBenchmarkSpec.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/test/benchmark/OrderingUtilBenchmarkSpec.scala 
b/s2rest_play/test/benchmark/OrderingUtilBenchmarkSpec.scala
deleted file mode 100644
index d2d3624..0000000
--- a/s2rest_play/test/benchmark/OrderingUtilBenchmarkSpec.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-package benchmark
-
-import com.kakao.s2graph.core.OrderingUtil._
-import com.kakao.s2graph.core.SeqMultiOrdering
-import play.api.libs.json.{JsNumber, JsValue}
-import play.api.test.PlaySpecification
-import play.api.{Application => PlayApplication}
-
-import scala.util.Random
-
-class OrderingUtilBenchmarkSpec extends BenchmarkCommon {
-  "OrderingUtilBenchmarkSpec" should {
-
-    "performance MultiOrdering any" >> {
-      val tupLs = (0 until 10) map { i =>
-        Random.nextDouble() -> Random.nextLong()
-      }
-
-      val seqLs = tupLs.map { tup =>
-        Seq(tup._1, tup._2)
-      }
-
-      val sorted1 = duration("TupleOrdering double,long") {
-        (0 until 1000) foreach { _ =>
-          tupLs.sortBy { case (x, y) =>
-            -x -> -y
-          }
-        }
-        tupLs.sortBy { case (x, y) =>
-          -x -> -y
-        }
-      }.map { x => x._1 }
-
-      val sorted2 = duration("MultiOrdering double,long") {
-        (0 until 1000) foreach { _ =>
-          seqLs.sorted(new SeqMultiOrdering[Any](Seq(false, false)))
-        }
-        seqLs.sorted(new SeqMultiOrdering[Any](Seq(false, false)))
-      }.map { x => x.head }
-
-      sorted1.toString() must_== sorted2.toString()
-    }
-
-    "performance MultiOrdering double" >> {
-      val tupLs = (0 until 50) map { i =>
-        Random.nextDouble() -> Random.nextDouble()
-      }
-
-      val seqLs = tupLs.map { tup =>
-        Seq(tup._1, tup._2)
-      }
-
-      duration("MultiOrdering double") {
-        (0 until 1000) foreach { _ =>
-          seqLs.sorted(new SeqMultiOrdering[Double](Seq(false, false)))
-        }
-      }
-
-      duration("TupleOrdering double") {
-        (0 until 1000) foreach { _ =>
-          tupLs.sortBy { case (x, y) =>
-            -x -> -y
-          }
-        }
-      }
-
-      1 must_== 1
-    }
-
-    "performance MultiOrdering jsvalue" >> {
-      val tupLs = (0 until 50) map { i =>
-        Random.nextDouble() -> Random.nextLong()
-      }
-
-      val seqLs = tupLs.map { tup =>
-        Seq(JsNumber(tup._1), JsNumber(tup._2))
-      }
-
-      val sorted1 = duration("TupleOrdering double,long") {
-        (0 until 1000) foreach { _ =>
-          tupLs.sortBy { case (x, y) =>
-            -x -> -y
-          }
-        }
-        tupLs.sortBy { case (x, y) =>
-          -x -> -y
-        }
-      }
-
-      val sorted2 = duration("MultiOrdering jsvalue") {
-        (0 until 1000) foreach { _ =>
-          seqLs.sorted(new SeqMultiOrdering[JsValue](Seq(false, false)))
-        }
-        seqLs.sorted(new SeqMultiOrdering[JsValue](Seq(false, false)))
-      }
-
-      1 must_== 1
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/test/benchmark/SamplingBenchmarkSpec.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/test/benchmark/SamplingBenchmarkSpec.scala 
b/s2rest_play/test/benchmark/SamplingBenchmarkSpec.scala
deleted file mode 100644
index 0c27a2a..0000000
--- a/s2rest_play/test/benchmark/SamplingBenchmarkSpec.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-package benchmark
-import play.api.test.{FakeApplication, PlaySpecification, WithApplication}
-import scala.annotation.tailrec
-import scala.util.Random
-
-class SamplingBenchmarkSpec extends BenchmarkCommon with PlaySpecification {
-  "sample" should {
-    implicit val app = FakeApplication()
-
-    "sample benchmark" in new WithApplication(app) {
-      @tailrec
-      def randomInt(n: Int, range: Int, set: Set[Int] = Set.empty[Int]): 
Set[Int] = {
-        if (set.size == n) set
-        else randomInt(n, range, set + Random.nextInt(range))
-      }
-
-      // sample using random array
-      def randomArraySample[T](num: Int, ls: List[T]): List[T] = {
-        val randomNum = randomInt(num, ls.size)
-        var sample = List.empty[T]
-        var idx = 0
-        ls.foreach { e =>
-          if (randomNum.contains(idx)) sample = e :: sample
-          idx += 1
-        }
-        sample
-      }
-
-      // sample using shuffle
-      def shuffleSample[T](num: Int, ls: List[T]): List[T] = {
-        Random.shuffle(ls).take(num)
-      }
-
-      // sample using random number generation
-      def rngSample[T](num: Int, ls: List[T]): List[T] = {
-        var sampled = List.empty[T]
-        val N = ls.size // population
-        var t = 0 // total input records dealt with
-        var m = 0 // number of items selected so far
-
-        while (m < num) {
-          val u = Random.nextDouble()
-          if ( (N - t)*u < num - m) {
-            sampled = ls(t) :: sampled
-            m += 1
-          }
-          t += 1
-        }
-        sampled
-      }
-
-      // test data
-      val testLimit = 500000
-      val testNum = 10
-      val testData = (0 to 1000).toList
-
-      // dummy for warm-up
-      (0 to testLimit) foreach { n =>
-        randomArraySample(testNum, testData)
-        shuffleSample(testNum, testData)
-        rngSample(testNum, testData)
-      }
-
-      duration("Random Array Sampling") {
-        (0 to testLimit) foreach { _ =>
-          val sampled = randomArraySample(testNum, testData)
-        }
-      }
-
-      duration("Shuffle Sampling") {
-        (0 to testLimit) foreach { _ =>
-          val sampled = shuffleSample(testNum, testData)
-        }
-      }
-
-      duration("RNG Sampling") {
-        (0 to testLimit) foreach { _ =>
-          val sampled = rngSample(testNum, testData)
-        }
-      }
-    }
-
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/test/controllers/PostProcessSpec.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/test/controllers/PostProcessSpec.scala 
b/s2rest_play/test/controllers/PostProcessSpec.scala
deleted file mode 100644
index cea132a..0000000
--- a/s2rest_play/test/controllers/PostProcessSpec.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-package controllers
-
-import com.kakao.s2graph.core.{OrderingUtil, SeqMultiOrdering}
-import play.api.libs.json.{JsNumber, JsString, JsValue}
-import play.api.test.PlaySpecification
-
-class PostProcessSpec extends PlaySpecification {
-  import OrderingUtil._
-
-  "test order by json" >> {
-    val jsLs: Seq[Seq[JsValue]] = Seq(
-      Seq(JsNumber(0), JsString("a")),
-      Seq(JsNumber(0), JsString("b")),
-      Seq(JsNumber(1), JsString("a")),
-      Seq(JsNumber(1), JsString("b")),
-      Seq(JsNumber(2), JsString("c"))
-    )
-
-    // number descending, string ascending
-    val sortedJsLs: Seq[Seq[JsValue]] = Seq(
-      Seq(JsNumber(2), JsString("c")),
-      Seq(JsNumber(1), JsString("a")),
-      Seq(JsNumber(1), JsString("b")),
-      Seq(JsNumber(0), JsString("a")),
-      Seq(JsNumber(0), JsString("b"))
-    )
-
-    val orderParam: Seq[Boolean] = Seq(false, true)
-    val resultJsLs = jsLs.sorted(new Ordering[Seq[JsValue]] {
-      override def compare(x: Seq[JsValue], y: Seq[JsValue]): Int = {
-        val xe = x.iterator
-        val ye = y.iterator
-        val oe = orderParam.iterator
-
-        while (xe.hasNext && ye.hasNext && oe.hasNext) {
-          val (xev, yev) = oe.next() match {
-            case true => xe.next() -> ye.next()
-            case false => ye.next() -> xe.next()
-          }
-          val res = (xev, yev) match {
-            case (JsNumber(xv), JsNumber(yv)) =>
-              Ordering[BigDecimal].compare(xv, yv)
-            case (JsString(xv), JsString(yv)) =>
-              Ordering[String].compare(xv, yv)
-            case _ => throw new Exception("type mismatch")
-          }
-          if (res != 0) return res
-        }
-
-        Ordering.Boolean.compare(xe.hasNext, ye.hasNext)
-      }
-    })
-
-    resultJsLs.toString() must_== sortedJsLs.toString
-  }
-
-  "test order by primitive type" >> {
-    val jsLs: Seq[Seq[Any]] = Seq(
-      Seq(0, "a"),
-      Seq(0, "b"),
-      Seq(1, "a"),
-      Seq(1, "b"),
-      Seq(2, "c")
-    )
-
-    // number descending, string ascending
-    val sortedJsLs: Seq[Seq[Any]] = Seq(
-      Seq(2, "c"),
-      Seq(1, "a"),
-      Seq(1, "b"),
-      Seq(0, "a"),
-      Seq(0, "b")
-    )
-
-    val ascendingLs: Seq[Boolean] = Seq(false, true)
-    val resultJsLs = jsLs.sorted(new SeqMultiOrdering[Any](ascendingLs))
-
-    resultJsLs.toString() must_== sortedJsLs.toString
-  }
-
-  "test order by primitive type with short ascending list" >> {
-    val jsLs: Seq[Seq[Any]] = Seq(
-      Seq(0, "a"),
-      Seq(1, "b"),
-      Seq(0, "b"),
-      Seq(1, "a"),
-      Seq(2, "c"),
-      Seq(1, "c"),
-      Seq(1, "d"),
-      Seq(1, "f"),
-      Seq(1, "e")
-    )
-
-    // number descending, string ascending(default)
-    val sortedJsLs: Seq[Seq[Any]] = Seq(
-      Seq(2, "c"),
-      Seq(1, "a"),
-      Seq(1, "b"),
-      Seq(1, "c"),
-      Seq(1, "d"),
-      Seq(1, "e"),
-      Seq(1, "f"),
-      Seq(0, "a"),
-      Seq(0, "b")
-    )
-
-    val ascendingLs: Seq[Boolean] = Seq(false)
-    val resultJsLs = jsLs.sorted(new SeqMultiOrdering[Any](ascendingLs))
-
-    resultJsLs.toString() must_== sortedJsLs.toString
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/BenchmarkCommon.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/BenchmarkCommon.scala 
b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/BenchmarkCommon.scala
new file mode 100644
index 0000000..3d77e16
--- /dev/null
+++ 
b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/BenchmarkCommon.scala
@@ -0,0 +1,24 @@
+package org.apache.s2graph.rest.play.benchmark
+
+import org.specs2.mutable.Specification
+
+trait BenchmarkCommon extends Specification {
+  val wrapStr = s"\n=================================================="
+
+  def duration[T](prefix: String = "")(block: => T) = {
+    val startTs = System.currentTimeMillis()
+    val ret = block
+    val endTs = System.currentTimeMillis()
+    println(s"$wrapStr\n$prefix: took ${endTs - startTs} ms$wrapStr")
+    ret
+  }
+
+  def durationWithReturn[T](prefix: String = "")(block: => T): (T, Long) = {
+    val startTs = System.currentTimeMillis()
+    val ret = block
+    val endTs = System.currentTimeMillis()
+    val duration = endTs - startTs
+//    println(s"$wrapStr\n$prefix: took $duration ms$wrapStr")
+    (ret, duration)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/GraphUtilSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/GraphUtilSpec.scala 
b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/GraphUtilSpec.scala
new file mode 100644
index 0000000..52ef4c5
--- /dev/null
+++ 
b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/GraphUtilSpec.scala
@@ -0,0 +1,124 @@
+package org.apache.s2graph.rest.play.benchmark
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.GraphUtil
+import org.apache.s2graph.core.types.{HBaseType, InnerVal, SourceVertexId}
+import play.api.test.{FakeApplication, PlaySpecification}
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+class GraphUtilSpec extends BenchmarkCommon with PlaySpecification {
+
+  def between(bytes: Array[Byte], startKey: Array[Byte], endKey: Array[Byte]): 
Boolean =
+    Bytes.compareTo(startKey, bytes) <= 0 && Bytes.compareTo(endKey, bytes) >= 0
+
+  def betweenShort(value: Short, start: Short, end: Short): Boolean =
+    start <= value && value <= end
+
+
+  "GraphUtil" should {
+    "test murmur3 hash function distribution" in {
+      val testNum = 1000000
+      val bucketSize = Short.MaxValue / 40
+      val countsNew = new mutable.HashMap[Int, Int]()
+      val counts = new mutable.HashMap[Int, Int]()
+      for {
+        i <- (0 until testNum)
+      } {
+        val h = GraphUtil.murmur3(i.toString) / bucketSize
+        val hNew = GraphUtil.murmur3Int(i.toString) / bucketSize
+        counts += (h -> (counts.getOrElse(h, 0) + 1))
+        countsNew += (hNew -> (countsNew.getOrElse(hNew, 0) + 1))
+      }
+      val all = counts.toList.sortBy { case (bucket, count) => count }.reverse
+      val allNew = countsNew.toList.sortBy { case (bucket, count) => count 
}.reverse
+      val top = all.take(10)
+      val bottom = all.takeRight(10)
+      val topNew = allNew.take(10)
+      val bottomNew = allNew.takeRight(10)
+      println(s"Top: $top")
+      println(s"Bottom: $bottom")
+      println("-" * 50)
+      println(s"TopNew: $topNew")
+      println(s"Bottom: $bottomNew")
+      true
+    }
+
+    "test murmur hash skew2" in {
+      running(FakeApplication()) {
+        import HBaseType._
+        val testNum = 1000000L
+        val regionCount = 40
+        val window = Int.MaxValue / regionCount
+        val rangeBytes = new ListBuffer[(List[Byte], List[Byte])]()
+        for {
+          i <- (0 until regionCount)
+        } yield {
+          val startKey =  Bytes.toBytes(i * window)
+          val endKey = Bytes.toBytes((i + 1) * window)
+          rangeBytes += (startKey.toList -> endKey.toList)
+        }
+
+
+
+        val stats = new collection.mutable.HashMap[Int, ((List[Byte], 
List[Byte]), Long)]()
+        val counts = new collection.mutable.HashMap[Short, Long]()
+        stats += (0 -> (rangeBytes.head -> 0L))
+
+        for (i <- (0L until testNum)) {
+          val vertexId = SourceVertexId(DEFAULT_COL_ID, InnerVal.withLong(i, 
HBaseType.DEFAULT_VERSION))
+          val bytes = vertexId.bytes
+          val shortKey = GraphUtil.murmur3(vertexId.innerId.toIdString())
+          val shortVal = counts.getOrElse(shortKey, 0L) + 1L
+          counts += (shortKey -> shortVal)
+          var j = 0
+          var found = false
+          while (j < rangeBytes.size && !found) {
+            val (start, end) = rangeBytes(j)
+            if (between(bytes, start.toArray, end.toArray)) {
+              found = true
+            }
+            j += 1
+          }
+          val head = rangeBytes(j - 1)
+          val key = j - 1
+          val value = stats.get(key) match {
+            case None => 0L
+            case Some(v) => v._2 + 1
+          }
+          stats += (key -> (head, value))
+        }
+        val sorted = stats.toList.sortBy(kv => kv._2._2).reverse
+        println(s"Index: StartBytes ~ EndBytes\tStartShortBytes ~ 
EndShortBytes\tStartShort ~ EndShort\tCount\tShortCount")
+        sorted.foreach { case (idx, ((start, end), cnt)) =>
+          val startShort = Bytes.toShort(start.take(2).toArray)
+          val endShort = Bytes.toShort(end.take(2).toArray)
+          val count = counts.count(t => startShort <= t._1 && t._1 < endShort)
+          println(s"$idx: $start ~ $end\t${start.take(2)} ~ 
${end.take(2)}\t$startShort ~ $endShort\t$cnt\t$count")
+
+        }
+        println("\n" * 10)
+        println(s"Index: StartBytes ~ EndBytes\tStartShortBytes ~ 
EndShortBytes\tStartShort ~ EndShort\tCount\tShortCount")
+        stats.toList.sortBy(kv => kv._1).reverse.foreach { case (idx, ((start, 
end), cnt)) =>
+          val startShort = Bytes.toShort(start.take(2).toArray)
+          val endShort = Bytes.toShort(end.take(2).toArray)
+          val count = counts.count(t => startShort <= t._1 && t._1 < endShort)
+          println(s"$idx: $start ~ $end\t${start.take(2)} ~ 
${end.take(2)}\t$startShort ~ $endShort\t$cnt\t$count")
+
+        }
+      }
+      true
+    }
+
+    "Bytes compareTo" in {
+      val x = Array[Byte](11, -12, -26, -14, -23)
+      val startKey = Array[Byte](0, 0, 0, 0)
+      val endKey = Array[Byte](12, -52, -52, -52)
+      println(Bytes.compareTo(startKey, x))
+      println(Bytes.compareTo(endKey, x))
+      true
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/JsonBenchmarkSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/JsonBenchmarkSpec.scala
 
b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/JsonBenchmarkSpec.scala
new file mode 100644
index 0000000..ee12b8d
--- /dev/null
+++ 
b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/JsonBenchmarkSpec.scala
@@ -0,0 +1,45 @@
+package org.apache.s2graph.rest.play.benchmark
+
+import play.api.libs.json.JsNumber
+import play.libs.Json
+
+class JsonBenchmarkSpec extends BenchmarkCommon {
+  "to json" >> {
+    "json benchmark" >> {
+
+      duration("map to json") {
+        (0 to 10) foreach { n =>
+          val numberMaps = (0 to 100).map { n => (n.toString -> JsNumber(n * 
n)) }.toMap
+          Json.toJson(numberMaps)
+        }
+      }
+
+      duration("directMakeJson") {
+        (0 to 10) foreach { n =>
+          var jsObj = play.api.libs.json.Json.obj()
+          (0 to 10).foreach { n =>
+            jsObj += (n.toString -> JsNumber(n * n))
+          }
+        }
+      }
+
+      duration("map to json 2") {
+        (0 to 50) foreach { n =>
+          val numberMaps = (0 to 10).map { n => (n.toString -> JsNumber(n * 
n)) }.toMap
+          Json.toJson(numberMaps)
+        }
+      }
+
+      duration("directMakeJson 2") {
+        (0 to 50) foreach { n =>
+          var jsObj = play.api.libs.json.Json.obj()
+          (0 to 10).foreach { n =>
+            jsObj += (n.toString -> JsNumber(n * n))
+          }
+        }
+      }
+      true
+    }
+    true
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/OrderingUtilBenchmarkSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/OrderingUtilBenchmarkSpec.scala
 
b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/OrderingUtilBenchmarkSpec.scala
new file mode 100644
index 0000000..cc194d7
--- /dev/null
+++ 
b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/OrderingUtilBenchmarkSpec.scala
@@ -0,0 +1,98 @@
+package org.apache.s2graph.rest.play.benchmark
+
+import org.apache.s2graph.core.OrderingUtil._
+import org.apache.s2graph.core.{OrderingUtil, SeqMultiOrdering}
+import play.api.libs.json.{JsNumber, JsValue}
+
+import scala.util.Random
+
+class OrderingUtilBenchmarkSpec extends BenchmarkCommon {
+  "OrderingUtilBenchmarkSpec" should {
+
+    "performance MultiOrdering any" >> {
+      val tupLs = (0 until 10) map { i =>
+        Random.nextDouble() -> Random.nextLong()
+      }
+
+      val seqLs = tupLs.map { tup =>
+        Seq(tup._1, tup._2)
+      }
+
+      val sorted1 = duration("TupleOrdering double,long") {
+        (0 until 1000) foreach { _ =>
+          tupLs.sortBy { case (x, y) =>
+            -x -> -y
+          }
+        }
+        tupLs.sortBy { case (x, y) =>
+          -x -> -y
+        }
+      }.map { x => x._1 }
+
+      val sorted2 = duration("MultiOrdering double,long") {
+        (0 until 1000) foreach { _ =>
+          seqLs.sorted(new SeqMultiOrdering[Any](Seq(false, false)))
+        }
+        seqLs.sorted(new SeqMultiOrdering[Any](Seq(false, false)))
+      }.map { x => x.head }
+
+      sorted1.toString() must_== sorted2.toString()
+    }
+
+    "performance MultiOrdering double" >> {
+      val tupLs = (0 until 50) map { i =>
+        Random.nextDouble() -> Random.nextDouble()
+      }
+
+      val seqLs = tupLs.map { tup =>
+        Seq(tup._1, tup._2)
+      }
+
+      duration("MultiOrdering double") {
+        (0 until 1000) foreach { _ =>
+          seqLs.sorted(new SeqMultiOrdering[Double](Seq(false, false)))
+        }
+      }
+
+      duration("TupleOrdering double") {
+        (0 until 1000) foreach { _ =>
+          tupLs.sortBy { case (x, y) =>
+            -x -> -y
+          }
+        }
+      }
+
+      1 must_== 1
+    }
+
+    "performance MultiOrdering jsvalue" >> {
+      val tupLs = (0 until 50) map { i =>
+        Random.nextDouble() -> Random.nextLong()
+      }
+
+      val seqLs = tupLs.map { tup =>
+        Seq(JsNumber(tup._1), JsNumber(tup._2))
+      }
+
+      val sorted1 = duration("TupleOrdering double,long") {
+        (0 until 1000) foreach { _ =>
+          tupLs.sortBy { case (x, y) =>
+            -x -> -y
+          }
+        }
+        tupLs.sortBy { case (x, y) =>
+          -x -> -y
+        }
+      }
+
+      val sorted2 = duration("MultiOrdering jsvalue") {
+        (0 until 1000) foreach { _ =>
+          seqLs.sorted(new SeqMultiOrdering[JsValue](Seq(false, false)))
+        }
+        seqLs.sorted(new SeqMultiOrdering[JsValue](Seq(false, false)))
+      }
+
+      1 must_== 1
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/SamplingBenchmarkSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/SamplingBenchmarkSpec.scala
 
b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/SamplingBenchmarkSpec.scala
new file mode 100644
index 0000000..2db95e4
--- /dev/null
+++ 
b/s2rest_play/test/org/apache/s2graph/rest/play/benchmark/SamplingBenchmarkSpec.scala
@@ -0,0 +1,87 @@
+package org.apache.s2graph.rest.play.benchmark
+
+import play.api.test.{FakeApplication, PlaySpecification, WithApplication}
+
+import scala.annotation.tailrec
+import scala.util.Random
+
+class SamplingBenchmarkSpec extends BenchmarkCommon with PlaySpecification {
+  "sample" should {
+    implicit val app = FakeApplication()
+
+    "sample benchmark" in new WithApplication(app) {
+      @tailrec
+      def randomInt(n: Int, range: Int, set: Set[Int] = Set.empty[Int]): 
Set[Int] = {
+        if (set.size == n) set
+        else randomInt(n, range, set + Random.nextInt(range))
+      }
+
+      // sample using random array
+      def randomArraySample[T](num: Int, ls: List[T]): List[T] = {
+        val randomNum = randomInt(num, ls.size)
+        var sample = List.empty[T]
+        var idx = 0
+        ls.foreach { e =>
+          if (randomNum.contains(idx)) sample = e :: sample
+          idx += 1
+        }
+        sample
+      }
+
+      // sample using shuffle
+      def shuffleSample[T](num: Int, ls: List[T]): List[T] = {
+        Random.shuffle(ls).take(num)
+      }
+
+      // sample using random number generation
+      def rngSample[T](num: Int, ls: List[T]): List[T] = {
+        var sampled = List.empty[T]
+        val N = ls.size // population
+        var t = 0 // total input records dealt with
+        var m = 0 // number of items selected so far
+
+        while (m < num) {
+          val u = Random.nextDouble()
+          if ( (N - t)*u < num - m) {
+            sampled = ls(t) :: sampled
+            m += 1
+          }
+          t += 1
+        }
+        sampled
+      }
+
+      // test data
+      val testLimit = 500000
+      val testNum = 10
+      val testData = (0 to 1000).toList
+
+      // dummy for warm-up
+      (0 to testLimit) foreach { n =>
+        randomArraySample(testNum, testData)
+        shuffleSample(testNum, testData)
+        rngSample(testNum, testData)
+      }
+
+      duration("Random Array Sampling") {
+        (0 to testLimit) foreach { _ =>
+          val sampled = randomArraySample(testNum, testData)
+        }
+      }
+
+      duration("Shuffle Sampling") {
+        (0 to testLimit) foreach { _ =>
+          val sampled = shuffleSample(testNum, testData)
+        }
+      }
+
+      duration("RNG Sampling") {
+        (0 to testLimit) foreach { _ =>
+          val sampled = rngSample(testNum, testData)
+        }
+      }
+    }
+
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/test/org/apache/s2graph/rest/play/controllers/PostProcessSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/test/org/apache/s2graph/rest/play/controllers/PostProcessSpec.scala
 
b/s2rest_play/test/org/apache/s2graph/rest/play/controllers/PostProcessSpec.scala
new file mode 100644
index 0000000..998aef8
--- /dev/null
+++ 
b/s2rest_play/test/org/apache/s2graph/rest/play/controllers/PostProcessSpec.scala
@@ -0,0 +1,112 @@
+package org.apache.s2graph.rest.play.controllers
+
+import org.apache.s2graph.core.{OrderingUtil, SeqMultiOrdering}
+import play.api.libs.json.{JsNumber, JsString, JsValue}
+import play.api.test.PlaySpecification
+
+class PostProcessSpec extends PlaySpecification {
+  import OrderingUtil._
+
+  "test order by json" >> {
+    val jsLs: Seq[Seq[JsValue]] = Seq(
+      Seq(JsNumber(0), JsString("a")),
+      Seq(JsNumber(0), JsString("b")),
+      Seq(JsNumber(1), JsString("a")),
+      Seq(JsNumber(1), JsString("b")),
+      Seq(JsNumber(2), JsString("c"))
+    )
+
+    // number descending, string ascending
+    val sortedJsLs: Seq[Seq[JsValue]] = Seq(
+      Seq(JsNumber(2), JsString("c")),
+      Seq(JsNumber(1), JsString("a")),
+      Seq(JsNumber(1), JsString("b")),
+      Seq(JsNumber(0), JsString("a")),
+      Seq(JsNumber(0), JsString("b"))
+    )
+
+    val orderParam: Seq[Boolean] = Seq(false, true)
+    val resultJsLs = jsLs.sorted(new Ordering[Seq[JsValue]] {
+      override def compare(x: Seq[JsValue], y: Seq[JsValue]): Int = {
+        val xe = x.iterator
+        val ye = y.iterator
+        val oe = orderParam.iterator
+
+        while (xe.hasNext && ye.hasNext && oe.hasNext) {
+          val (xev, yev) = oe.next() match {
+            case true => xe.next() -> ye.next()
+            case false => ye.next() -> xe.next()
+          }
+          val res = (xev, yev) match {
+            case (JsNumber(xv), JsNumber(yv)) =>
+              Ordering[BigDecimal].compare(xv, yv)
+            case (JsString(xv), JsString(yv)) =>
+              Ordering[String].compare(xv, yv)
+            case _ => throw new Exception("type mismatch")
+          }
+          if (res != 0) return res
+        }
+
+        Ordering.Boolean.compare(xe.hasNext, ye.hasNext)
+      }
+    })
+
+    resultJsLs.toString() must_== sortedJsLs.toString
+  }
+
+  "test order by primitive type" >> {
+    val jsLs: Seq[Seq[Any]] = Seq(
+      Seq(0, "a"),
+      Seq(0, "b"),
+      Seq(1, "a"),
+      Seq(1, "b"),
+      Seq(2, "c")
+    )
+
+    // number descending, string ascending
+    val sortedJsLs: Seq[Seq[Any]] = Seq(
+      Seq(2, "c"),
+      Seq(1, "a"),
+      Seq(1, "b"),
+      Seq(0, "a"),
+      Seq(0, "b")
+    )
+
+    val ascendingLs: Seq[Boolean] = Seq(false, true)
+    val resultJsLs = jsLs.sorted(new SeqMultiOrdering[Any](ascendingLs))
+
+    resultJsLs.toString() must_== sortedJsLs.toString
+  }
+
+  "test order by primitive type with short ascending list" >> {
+    val jsLs: Seq[Seq[Any]] = Seq(
+      Seq(0, "a"),
+      Seq(1, "b"),
+      Seq(0, "b"),
+      Seq(1, "a"),
+      Seq(2, "c"),
+      Seq(1, "c"),
+      Seq(1, "d"),
+      Seq(1, "f"),
+      Seq(1, "e")
+    )
+
+    // number descending, string ascending(default)
+    val sortedJsLs: Seq[Seq[Any]] = Seq(
+      Seq(2, "c"),
+      Seq(1, "a"),
+      Seq(1, "b"),
+      Seq(1, "c"),
+      Seq(1, "d"),
+      Seq(1, "e"),
+      Seq(1, "f"),
+      Seq(0, "a"),
+      Seq(0, "b")
+    )
+
+    val ascendingLs: Seq[Boolean] = Seq(false)
+    val resultJsLs = jsLs.sorted(new SeqMultiOrdering[Any](ascendingLs))
+
+    resultJsLs.toString() must_== sortedJsLs.toString
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/spark/src/main/scala/org/apache/s2graph/spark/config/S2ConfigFactory.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/s2graph/spark/config/S2ConfigFactory.scala 
b/spark/src/main/scala/org/apache/s2graph/spark/config/S2ConfigFactory.scala
new file mode 100644
index 0000000..99d3ba4
--- /dev/null
+++ b/spark/src/main/scala/org/apache/s2graph/spark/config/S2ConfigFactory.scala
@@ -0,0 +1,20 @@
+package org.apache.s2graph.spark.config
+
+import com.typesafe.config.{Config, ConfigFactory}
+
+object S2ConfigFactory {
+  lazy val config: Config = _load
+
+  @deprecated("do not call explicitly. use config", "0.0.6")
+  def load(): Config = {
+    _load
+  }
+
+  def _load: Config = {
+    // default configuration file name : application.conf
+    val sysConfig = ConfigFactory.parseProperties(System.getProperties)
+
+    lazy val phase = if (!sysConfig.hasPath("phase")) "alpha" else 
sysConfig.getString("phase")
+    
sysConfig.withFallback(ConfigFactory.parseResourcesAnySyntax(s"$phase.conf")).withFallback(ConfigFactory.load())
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/spark/src/main/scala/org/apache/s2graph/spark/spark/HashMapParam.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/s2graph/spark/spark/HashMapParam.scala 
b/spark/src/main/scala/org/apache/s2graph/spark/spark/HashMapParam.scala
new file mode 100644
index 0000000..b8e73ca
--- /dev/null
+++ b/spark/src/main/scala/org/apache/s2graph/spark/spark/HashMapParam.scala
@@ -0,0 +1,55 @@
+package org.apache.s2graph.spark.spark
+
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.{AccumulableParam, SparkConf}
+
+import scala.collection.mutable.{HashMap => MutableHashMap}
+
+/*
+ * Allows a mutable HashMap[String, Int] to be used as an accumulator in Spark.
+ * Whenever we try to put (k, v2) into an accumulator that already contains 
(k, v1), the result
+ * will be a HashMap containing (k, v1 + v2).
+ *
+ * Would have been nice to extend GrowableAccumulableParam instead of 
redefining everything, but it's
+ * private to the spark package.
+ */
+
+class HashMapParam[K, V](op: (V, V) => V) extends 
AccumulableParam[MutableHashMap[K, V], (K, V)] {
+  type MapType = MutableHashMap[K, V]
+  type ElemType = (K, V)
+
+  def addAccumulator(acc: MapType, elem: ElemType): MapType = {
+    val (k1, v1) = elem
+    acc += acc.find(_._1 == k1).map {
+      case (k2, v2) => k2 -> op(v1, v2)
+    }.getOrElse(elem)
+
+    acc
+  }
+
+  /*
+   * This method is allowed to modify and return the first value for 
efficiency.
+   *
+   * @see org.apache.spark.GrowableAccumulableParam.addInPlace(r1: R, r2: R): R
+   */
+  def addInPlace(acc1: MapType, acc2: MapType): MapType = {
+    acc2.foreach(elem => addAccumulator(acc1, elem))
+    acc1
+  }
+
+  /*
+   * @see org.apache.spark.GrowableAccumulableParam.zero(initialValue: R): R
+   */
+  def zero(initialValue: MapType): MapType = {
+    val ser = new JavaSerializer(new SparkConf(false)).newInstance()
+    val copy = ser.deserialize[MapType](ser.serialize(initialValue))
+    copy.clear()
+    copy
+  }
+}
+
+object HashMapParam {
+  def apply[K, V](op: (V, V) => V): HashMapParam[K, V] = {
+    new HashMapParam[K, V](op)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/spark/src/main/scala/org/apache/s2graph/spark/spark/RDDUtil.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/s2graph/spark/spark/RDDUtil.scala 
b/spark/src/main/scala/org/apache/s2graph/spark/spark/RDDUtil.scala
new file mode 100644
index 0000000..651d78b
--- /dev/null
+++ b/spark/src/main/scala/org/apache/s2graph/spark/spark/RDDUtil.scala
@@ -0,0 +1,13 @@
+package org.apache.s2graph.spark.spark
+
+import org.apache.spark.rdd.RDD
+
+object RDDUtil {
+  def rddIsNonEmpty[T](rdd: RDD[T]): Boolean = {
+    !rddIsEmpty(rdd)
+  }
+
+  def rddIsEmpty[T](rdd: RDD[T]): Boolean = {
+    rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_ && _)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/spark/src/main/scala/org/apache/s2graph/spark/spark/SparkApp.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/s2graph/spark/spark/SparkApp.scala 
b/spark/src/main/scala/org/apache/s2graph/spark/spark/SparkApp.scala
new file mode 100644
index 0000000..2e11904
--- /dev/null
+++ b/spark/src/main/scala/org/apache/s2graph/spark/spark/SparkApp.scala
@@ -0,0 +1,120 @@
+package org.apache.s2graph.spark.spark
+
+import kafka.serializer.StringDecoder
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.kafka.{KafkaUtils, StreamHelper}
+import org.apache.spark.streaming.{Duration, StreamingContext}
+import org.apache.spark.{Accumulable, Logging, SparkConf}
+
+import scala.collection.mutable.{HashMap => MutableHashMap}
+
+trait SparkApp extends Logging {
+  type HashMapAccumulable = Accumulable[MutableHashMap[String, Long], (String, 
Long)]
+
+  protected def args: Array[String] = _args
+
+  private var _args: Array[String] = _
+
+  private var streamHelper: StreamHelper = _
+
+  // should implement in derived class
+  def run()
+
+  def getArgs(index: Int) = args(index)
+
+  def main(args: Array[String]) {
+    _args = args
+    run()
+  }
+
+  def validateArgument(argNames: String*): Unit = {
+    if (args == null || args.length < argNames.length) {
+      System.err.println(s"Usage: ${getClass.getName} " + argNames.map(s => 
s"<$s>").mkString(" "))
+      System.exit(1)
+    }
+  }
+
+  def buildKafkaGroupId(topic: String, ext: String): String = {
+    val phase = System.getProperty("phase")
+
+    // use first topic for group id
+    val groupId = s"${topic.split(',')(0)}_$ext"
+
+    groupId + {
+      phase match {
+        case "real" | "production" => ""
+        case x => s"_$x"
+      }
+    }
+  }
+
+  def getStreamHelper(kafkaParam: Map[String, String]): StreamHelper = {
+    if (streamHelper == null) {
+      this.synchronized {
+        if (streamHelper == null) {
+          streamHelper = StreamHelper(kafkaParam)
+        }
+      }
+    }
+    streamHelper
+  }
+
+  def sparkConf(jobName: String): SparkConf = {
+    val conf = new SparkConf()
+    conf.setAppName(jobName)
+    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+    conf.set("spark.streaming.unpersist", "true")
+    conf
+  }
+
+  def streamingContext(sparkConf: SparkConf, interval: Duration, checkPoint: 
Option[String] = None) = {
+    val ssc = new StreamingContext(sparkConf, interval)
+    checkPoint.foreach { dir =>
+      ssc.checkpoint(dir)
+    }
+
+    // for watch tower
+    ssc.addStreamingListener(new SubscriberListener(ssc))
+
+    ssc
+  }
+
+  def createKafkaPairStream(ssc: StreamingContext, kafkaParam: Map[String, 
String], topics: String, numPartition: Option[Int] = None): DStream[(String, 
String)] = {
+    val topicMap = topics.split(",").map((_, 1)).toMap
+    val stream = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParam, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)
+    numPartition.map(n =>
+      stream.repartition(n)
+    ).getOrElse(stream)
+  }
+
+  def createKafkaValueStream(ssc: StreamingContext, kafkaParam: Map[String, 
String], topics: String, numPartition: Option[Int] = None): DStream[String] = {
+    createKafkaPairStream(ssc, kafkaParam, topics, numPartition).map(_._2)
+  }
+
+  def createKafkaPairStreamMulti(ssc: StreamingContext, kafkaParam: 
Map[String, String], topics: String, receiverCount: Int, numPartition: 
Option[Int] = None): DStream[(String, String)] = {
+    // wait until all executor is running
+    
Stream.continually(ssc.sparkContext.getExecutorStorageStatus).takeWhile(_.length
 < receiverCount).foreach { arr =>
+      Thread.sleep(100)
+    }
+    Thread.sleep(1000)
+
+    val topicMap = topics.split(",").map((_, 1)).toMap
+
+    val stream = {
+      val streams = {
+        (1 to receiverCount) map { _ =>
+          KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParam, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)
+        }
+      }
+      ssc.union(streams)
+    }
+    numPartition.map(n =>
+      stream.repartition(n)
+    ).getOrElse(stream)
+  }
+
+  def createKafkaValueStreamMulti(ssc: StreamingContext, kafkaParam: 
Map[String, String], topics: String, receiverCount: Int, numPartition: 
Option[Int] = None): DStream[String] = {
+    createKafkaPairStreamMulti(ssc, kafkaParam, topics, receiverCount, 
numPartition).map(_._2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/spark/src/main/scala/org/apache/s2graph/spark/spark/SubscriberListener.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/s2graph/spark/spark/SubscriberListener.scala 
b/spark/src/main/scala/org/apache/s2graph/spark/spark/SubscriberListener.scala
new file mode 100644
index 0000000..f23debb
--- /dev/null
+++ 
b/spark/src/main/scala/org/apache/s2graph/spark/spark/SubscriberListener.scala
@@ -0,0 +1,20 @@
+package org.apache.s2graph.spark.spark
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.scheduler.{StreamingListener, 
StreamingListenerReceiverError, StreamingListenerReceiverStarted, 
StreamingListenerReceiverStopped}
+
+class SubscriberListener(ssc: StreamingContext) extends StreamingListener with 
Logging {
+  override def onReceiverError(receiverError: StreamingListenerReceiverError): 
Unit = {
+    logInfo("onReceiverError")
+  }
+
+  override def onReceiverStarted(receiverStarted: 
StreamingListenerReceiverStarted): Unit = {
+    logInfo("onReceiverStarted")
+  }
+
+  override def onReceiverStopped(receiverStopped: 
StreamingListenerReceiverStopped): Unit = {
+    logInfo("onReceiverStopped")
+    ssc.stop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/spark/src/main/scala/org/apache/s2graph/spark/spark/WithKafka.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/s2graph/spark/spark/WithKafka.scala 
b/spark/src/main/scala/org/apache/s2graph/spark/spark/WithKafka.scala
new file mode 100644
index 0000000..f28b9cf
--- /dev/null
+++ b/spark/src/main/scala/org/apache/s2graph/spark/spark/WithKafka.scala
@@ -0,0 +1,69 @@
+package org.apache.s2graph.spark.spark
+
+import java.util.Properties
+
+import kafka.producer.{Producer, ProducerConfig}
+
+trait WithKafka {
+  def kafkaConf(brokerList: String) = {
+    val props = new Properties()
+    props.put("metadata.broker.list", brokerList)
+    props.put("request.required.acks", "0")
+    props.put("producer.type", "async")
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("compression.codec", "1")
+    props.put("message.send.max.retries", "3")
+    props.put("batch.num.messages", "1000")
+    new ProducerConfig(props)
+  }
+
+  def producerConfig(brokerList: String, requireAcks: String = "1", 
producerType: String = "sync") = {
+    val props = new Properties()
+    props.setProperty("metadata.broker.list", brokerList)
+    props.setProperty("request.required.acks", requireAcks)
+    props.setProperty("producer.type", producerType)
+    props.setProperty("serializer.class", "kafka.serializer.StringEncoder")
+    props.setProperty("compression.codec", "snappy")
+    props.setProperty("message.send.max.retries", "1")
+    new ProducerConfig(props)
+  }
+
+  def getProducer[K, V](config: ProducerConfig): Producer[K, V] = {
+    new Producer[K, V](config)
+  }
+
+  def getProducer[K, V](brokers: String): Producer[K, V] = {
+    getProducer(producerConfig(brokers))
+  }
+
+  /**
+   * Kafka DefaultPartitioner
+   * @param k
+   * @param n
+   * @return
+   */
+  def getPartKey(k: Any, n: Int): Int = {
+    kafka.utils.Utils.abs(k.hashCode()) % n
+  }
+
+  def makeKafkaGroupId(topic: String, ext: String): String = {
+    val phase = System.getProperty("phase")
+
+    var groupId = s"${topic}_$ext"
+
+    groupId += {
+      System.getProperty("spark.master") match {
+        case x if x.startsWith("local") => "_local"
+        case _ => ""
+      }
+    }
+
+    groupId += {
+      phase match {
+        case "alpha" => "_alpha"
+        case _ => ""
+      }}
+
+    groupId
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/spark/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDFunctions.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDFunctions.scala 
b/spark/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDFunctions.scala
index 56be543..768bedb 100644
--- 
a/spark/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDFunctions.scala
+++ 
b/spark/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDFunctions.scala
@@ -6,9 +6,6 @@ import org.apache.spark.rdd.RDD
 import scala.language.implicitConversions
 import scala.reflect.ClassTag
 
-/**
- * Created by hsleep([email protected]) on 15. 5. 6..
- */
 class KafkaRDDFunctions[T: ClassTag](self: RDD[T])
   extends Logging
   with Serializable

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/spark/src/main/scala/org/apache/spark/streaming/kafka/StreamHelper.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/spark/streaming/kafka/StreamHelper.scala 
b/spark/src/main/scala/org/apache/spark/streaming/kafka/StreamHelper.scala
index 782f87f..0baf397 100644
--- a/spark/src/main/scala/org/apache/spark/streaming/kafka/StreamHelper.scala
+++ b/spark/src/main/scala/org/apache/spark/streaming/kafka/StreamHelper.scala
@@ -8,12 +8,8 @@ import kafka.serializer.Decoder
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.dstream.InputDStream
 import org.apache.spark.{Logging, SparkException}
-
 import scala.reflect.ClassTag
 
-/**
- * Created by hsleep([email protected]) on 15. 4. 22..
- */
 case class StreamHelper(kafkaParams: Map[String, String]) extends Logging {
   // helper for kafka zookeeper
   lazy val kafkaHelper = KafkaHelper(kafkaParams)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/spark/src/main/scala/s2/config/S2ConfigFactory.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/s2/config/S2ConfigFactory.scala 
b/spark/src/main/scala/s2/config/S2ConfigFactory.scala
deleted file mode 100644
index 7666cdc..0000000
--- a/spark/src/main/scala/s2/config/S2ConfigFactory.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-package s2.config
-
-import com.typesafe.config.{Config, ConfigFactory}
-
-/**
- * Created by alec on 15. 3. 4..
- */
-
-/**
- * phase에 따라 phase.conf 파일을 load 해주는 config factory
- */
-object S2ConfigFactory {
-  lazy val config: Config = _load
-
-  @deprecated("do not call explicitly. use config", "0.0.6")
-  def load(): Config = {
-    _load
-  }
-
-  def _load: Config = {
-    // default configuration file name : application.conf
-    val sysConfig = ConfigFactory.parseProperties(System.getProperties)
-
-    lazy val phase = if (!sysConfig.hasPath("phase")) "alpha" else 
sysConfig.getString("phase")
-    
sysConfig.withFallback(ConfigFactory.parseResourcesAnySyntax(s"$phase.conf")).withFallback(ConfigFactory.load())
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/spark/src/main/scala/s2/spark/HashMapParam.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/s2/spark/HashMapParam.scala 
b/spark/src/main/scala/s2/spark/HashMapParam.scala
deleted file mode 100644
index a84c687..0000000
--- a/spark/src/main/scala/s2/spark/HashMapParam.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-package s2.spark
-
-import org.apache.spark.serializer.JavaSerializer
-import org.apache.spark.{AccumulableParam, SparkConf}
-
-import scala.collection.mutable.{HashMap => MutableHashMap}
-
-/*
- * Allows a mutable HashMap[String, Int] to be used as an accumulator in Spark.
- * Whenever we try to put (k, v2) into an accumulator that already contains 
(k, v1), the result
- * will be a HashMap containing (k, v1 + v2).
- *
- * Would have been nice to extend GrowableAccumulableParam instead of 
redefining everything, but it's
- * private to the spark package.
- */
-
-class HashMapParam[K, V](op: (V, V) => V) extends 
AccumulableParam[MutableHashMap[K, V], (K, V)] {
-  type MapType = MutableHashMap[K, V]
-  type ElemType = (K, V)
-
-  def addAccumulator(acc: MapType, elem: ElemType): MapType = {
-    val (k1, v1) = elem
-    acc += acc.find(_._1 == k1).map {
-      case (k2, v2) => k2 -> op(v1, v2)
-    }.getOrElse(elem)
-
-    acc
-  }
-
-  /*
-   * This method is allowed to modify and return the first value for 
efficiency.
-   *
-   * @see org.apache.spark.GrowableAccumulableParam.addInPlace(r1: R, r2: R): R
-   */
-  def addInPlace(acc1: MapType, acc2: MapType): MapType = {
-    acc2.foreach(elem => addAccumulator(acc1, elem))
-    acc1
-  }
-
-  /*
-   * @see org.apache.spark.GrowableAccumulableParam.zero(initialValue: R): R
-   */
-  def zero(initialValue: MapType): MapType = {
-    val ser = new JavaSerializer(new SparkConf(false)).newInstance()
-    val copy = ser.deserialize[MapType](ser.serialize(initialValue))
-    copy.clear()
-    copy
-  }
-}
-
-object HashMapParam {
-  def apply[K, V](op: (V, V) => V): HashMapParam[K, V] = {
-    new HashMapParam[K, V](op)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/spark/src/main/scala/s2/spark/RDDUtil.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/s2/spark/RDDUtil.scala 
b/spark/src/main/scala/s2/spark/RDDUtil.scala
deleted file mode 100644
index c5a371a..0000000
--- a/spark/src/main/scala/s2/spark/RDDUtil.scala
+++ /dev/null
@@ -1,16 +0,0 @@
-package s2.spark
-
-import org.apache.spark.rdd.RDD
-
-/**
- * Created by hsleep([email protected]) on 14. 12. 23..
- */
-object RDDUtil {
-  def rddIsNonEmpty[T](rdd: RDD[T]): Boolean = {
-    !rddIsEmpty(rdd)
-  }
-
-  def rddIsEmpty[T](rdd: RDD[T]): Boolean = {
-    rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_ && _)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/spark/src/main/scala/s2/spark/SparkApp.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/s2/spark/SparkApp.scala 
b/spark/src/main/scala/s2/spark/SparkApp.scala
deleted file mode 100644
index e27b7ec..0000000
--- a/spark/src/main/scala/s2/spark/SparkApp.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-package s2.spark
-
-import kafka.serializer.StringDecoder
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.dstream.DStream
-import org.apache.spark.streaming.kafka.{KafkaUtils, StreamHelper}
-import org.apache.spark.streaming.{Duration, StreamingContext}
-import org.apache.spark.{Accumulable, Logging, SparkConf}
-
-import scala.collection.mutable.{HashMap => MutableHashMap}
-
-
-/**
- * Created by hsleep([email protected]) on 14. 12. 26..
- */
-trait SparkApp extends Logging {
-  type HashMapAccumulable = Accumulable[MutableHashMap[String, Long], (String, 
Long)]
-
-  protected def args: Array[String] = _args
-
-  private var _args: Array[String] = _
-
-  private var streamHelper: StreamHelper = _
-
-  // should implement in derived class
-  def run()
-
-  def getArgs(index: Int) = args(index)
-
-  def main(args: Array[String]) {
-    _args = args
-    run()
-  }
-
-  def validateArgument(argNames: String*): Unit = {
-    if (args == null || args.length < argNames.length) {
-      System.err.println(s"Usage: ${getClass.getName} " + argNames.map(s => 
s"<$s>").mkString(" "))
-      System.exit(1)
-    }
-  }
-
-  def buildKafkaGroupId(topic: String, ext: String): String = {
-    val phase = System.getProperty("phase")
-
-    // use first topic for group id
-    val groupId = s"${topic.split(',')(0)}_$ext"
-
-    groupId + {
-      phase match {
-        case "real" | "production" => ""
-        case x => s"_$x"
-      }
-    }
-  }
-
-  def getStreamHelper(kafkaParam: Map[String, String]): StreamHelper = {
-    if (streamHelper == null) {
-      this.synchronized {
-        if (streamHelper == null) {
-          streamHelper = StreamHelper(kafkaParam)
-        }
-      }
-    }
-    streamHelper
-  }
-
-  def sparkConf(jobName: String): SparkConf = {
-    val conf = new SparkConf()
-    conf.setAppName(jobName)
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-    conf.set("spark.streaming.unpersist", "true")
-    conf
-  }
-
-  def streamingContext(sparkConf: SparkConf, interval: Duration, checkPoint: 
Option[String] = None) = {
-    val ssc = new StreamingContext(sparkConf, interval)
-    checkPoint.foreach { dir =>
-      ssc.checkpoint(dir)
-    }
-
-    // for watch tower
-    ssc.addStreamingListener(new SubscriberListener(ssc))
-
-    ssc
-  }
-
-  def createKafkaPairStream(ssc: StreamingContext, kafkaParam: Map[String, 
String], topics: String, numPartition: Option[Int] = None): DStream[(String, 
String)] = {
-    val topicMap = topics.split(",").map((_, 1)).toMap
-    val stream = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParam, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)
-    numPartition.map(n =>
-      stream.repartition(n)
-    ).getOrElse(stream)
-  }
-
-  def createKafkaValueStream(ssc: StreamingContext, kafkaParam: Map[String, 
String], topics: String, numPartition: Option[Int] = None): DStream[String] = {
-    createKafkaPairStream(ssc, kafkaParam, topics, numPartition).map(_._2)
-  }
-
-  def createKafkaPairStreamMulti(ssc: StreamingContext, kafkaParam: 
Map[String, String], topics: String, receiverCount: Int, numPartition: 
Option[Int] = None): DStream[(String, String)] = {
-    // wait until all executor is running
-    
Stream.continually(ssc.sparkContext.getExecutorStorageStatus).takeWhile(_.length
 < receiverCount).foreach { arr =>
-      Thread.sleep(100)
-    }
-    Thread.sleep(1000)
-
-    val topicMap = topics.split(",").map((_, 1)).toMap
-
-    val stream = {
-      val streams = {
-        (1 to receiverCount) map { _ =>
-          KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParam, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)
-        }
-      }
-      ssc.union(streams)
-    }
-    numPartition.map(n =>
-      stream.repartition(n)
-    ).getOrElse(stream)
-  }
-
-  def createKafkaValueStreamMulti(ssc: StreamingContext, kafkaParam: 
Map[String, String], topics: String, receiverCount: Int, numPartition: 
Option[Int] = None): DStream[String] = {
-    createKafkaPairStreamMulti(ssc, kafkaParam, topics, receiverCount, 
numPartition).map(_._2)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/spark/src/main/scala/s2/spark/SubscriberListener.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/s2/spark/SubscriberListener.scala 
b/spark/src/main/scala/s2/spark/SubscriberListener.scala
deleted file mode 100644
index 9e156a7..0000000
--- a/spark/src/main/scala/s2/spark/SubscriberListener.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-package s2.spark
-
-import org.apache.spark.Logging
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.scheduler.{StreamingListener, 
StreamingListenerReceiverError, StreamingListenerReceiverStarted, 
StreamingListenerReceiverStopped}
-
-/**
- * Created by hsleep([email protected]) on 15. 1. 8..
- */
-
-class SubscriberListener(ssc: StreamingContext) extends StreamingListener with 
Logging {
-  override def onReceiverError(receiverError: StreamingListenerReceiverError): 
Unit = {
-    logInfo("onReceiverError")
-  }
-
-  override def onReceiverStarted(receiverStarted: 
StreamingListenerReceiverStarted): Unit = {
-    logInfo("onReceiverStarted")
-  }
-
-  override def onReceiverStopped(receiverStopped: 
StreamingListenerReceiverStopped): Unit = {
-    logInfo("onReceiverStopped")
-    ssc.stop()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/spark/src/main/scala/s2/spark/WithKafka.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/s2/spark/WithKafka.scala 
b/spark/src/main/scala/s2/spark/WithKafka.scala
deleted file mode 100644
index 9bd5944..0000000
--- a/spark/src/main/scala/s2/spark/WithKafka.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-package s2.spark
-
-import java.util.Properties
-
-import kafka.producer.{Producer, ProducerConfig}
-
-trait WithKafka {
-  def kafkaConf(brokerList: String) = {
-    val props = new Properties()
-    props.put("metadata.broker.list", brokerList)
-    props.put("request.required.acks", "0")
-    props.put("producer.type", "async")
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("compression.codec", "1")
-    props.put("message.send.max.retries", "3")
-    props.put("batch.num.messages", "1000")
-    new ProducerConfig(props)
-  }
-
-  def producerConfig(brokerList: String, requireAcks: String = "1", 
producerType: String = "sync") = {
-    val props = new Properties()
-    props.setProperty("metadata.broker.list", brokerList)
-    props.setProperty("request.required.acks", requireAcks)
-    props.setProperty("producer.type", producerType)
-    props.setProperty("serializer.class", "kafka.serializer.StringEncoder")
-    props.setProperty("compression.codec", "snappy")
-    props.setProperty("message.send.max.retries", "1")
-    new ProducerConfig(props)
-  }
-
-  def getProducer[K, V](config: ProducerConfig): Producer[K, V] = {
-    new Producer[K, V](config)
-  }
-
-  def getProducer[K, V](brokers: String): Producer[K, V] = {
-    getProducer(producerConfig(brokers))
-  }
-
-  /**
-   * Kafka DefaultPartitioner
-   * @param k
-   * @param n
-   * @return
-   */
-  def getPartKey(k: Any, n: Int): Int = {
-    kafka.utils.Utils.abs(k.hashCode()) % n
-  }
-
-  def makeKafkaGroupId(topic: String, ext: String): String = {
-    val phase = System.getProperty("phase")
-
-    var groupId = s"${topic}_$ext"
-
-    groupId += {
-      System.getProperty("spark.master") match {
-        case x if x.startsWith("local") => "_local"
-        case _ => ""
-      }
-    }
-
-    groupId += {
-      phase match {
-        case "alpha" => "_alpha"
-        case _ => ""
-      }}
-
-    groupId
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/spark/src/test/scala/org/apache/s2graph/spark/SparkAppTest.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/s2graph/spark/SparkAppTest.scala 
b/spark/src/test/scala/org/apache/s2graph/spark/SparkAppTest.scala
new file mode 100644
index 0000000..2d48a89
--- /dev/null
+++ b/spark/src/test/scala/org/apache/s2graph/spark/SparkAppTest.scala
@@ -0,0 +1,17 @@
+package org.apache.s2graph.spark
+
+import org.apache.s2graph.spark.spark.SparkApp
+import org.scalatest.{FunSuite, Matchers}
+
+object TestApp extends SparkApp {
+  override def run(): Unit = {
+    validateArgument("topic", "phase")
+  }
+}
+
+class SparkAppTest extends FunSuite with Matchers {
+  test("parse argument") {
+    TestApp.main(Array("s2graphInreal", "real"))
+    TestApp.getArgs(0) shouldEqual "s2graphInreal"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/spark/src/test/scala/org/apache/s2graph/spark/TestStreamingSpec.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/test/scala/org/apache/s2graph/spark/TestStreamingSpec.scala 
b/spark/src/test/scala/org/apache/s2graph/spark/TestStreamingSpec.scala
new file mode 100644
index 0000000..df6c7e5
--- /dev/null
+++ b/spark/src/test/scala/org/apache/s2graph/spark/TestStreamingSpec.scala
@@ -0,0 +1,31 @@
+package org.apache.s2graph.spark
+
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.specs2.mutable.Specification
+import org.specs2.specification.BeforeAfterAll
+
+class TestStreamingSpec extends Specification with BeforeAfterAll {
+  private val master = "local[2]"
+  private val appName = "test_streaming"
+  private val batchDuration = Seconds(1)
+
+  private var sc: SparkContext = _
+  private var ssc: StreamingContext = _
+
+  override def beforeAll(): Unit = {
+    val conf = new SparkConf()
+      .setMaster(master)
+      .setAppName(appName)
+
+    ssc = new StreamingContext(conf, batchDuration)
+
+    sc = ssc.sparkContext
+  }
+
+  override def afterAll(): Unit = {
+    if (ssc != null) {
+      ssc.stop()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/spark/src/test/scala/s2/spark/SparkAppTest.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/s2/spark/SparkAppTest.scala 
b/spark/src/test/scala/s2/spark/SparkAppTest.scala
deleted file mode 100644
index 6bf2482..0000000
--- a/spark/src/test/scala/s2/spark/SparkAppTest.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-package s2.spark
-
-import org.scalatest.{FunSuite, Matchers}
-
-/**
- * Created by alec.k on 14. 12. 26..
- */
-
-object TestApp extends SparkApp {
-  // 상속받은 클래스에서 구현해줘야 하는 함수
-  override def run(): Unit = {
-    validateArgument("topic", "phase")
-  }
-}
-
-class SparkAppTest extends FunSuite with Matchers {
-  test("parse argument") {
-    TestApp.main(Array("s2graphInreal", "real"))
-    TestApp.getArgs(0) shouldEqual "s2graphInreal"
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/spark/src/test/scala/s2/spark/TestStreamingSpec.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/s2/spark/TestStreamingSpec.scala 
b/spark/src/test/scala/s2/spark/TestStreamingSpec.scala
deleted file mode 100644
index bf15618..0000000
--- a/spark/src/test/scala/s2/spark/TestStreamingSpec.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-package s2.spark
-
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.{SparkConf, SparkContext}
-import org.specs2.mutable.Specification
-import org.specs2.specification.BeforeAfterAll
-
-/**
- * Created by hsleep([email protected]) on 15. 6. 17..
- */
-class TestStreamingSpec extends Specification with BeforeAfterAll {
-  private val master = "local[2]"
-  private val appName = "test_streaming"
-  private val batchDuration = Seconds(1)
-
-  private var sc: SparkContext = _
-  private var ssc: StreamingContext = _
-
-  override def beforeAll(): Unit = {
-    val conf = new SparkConf()
-      .setMaster(master)
-      .setAppName(appName)
-
-    ssc = new StreamingContext(conf, batchDuration)
-
-    sc = ssc.sparkContext
-  }
-
-  override def afterAll(): Unit = {
-    if (ssc != null) {
-      ssc.stop()
-    }
-  }
-}

Reply via email to