http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala new file mode 100644 index 0000000..6b3babb --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala @@ -0,0 +1,66 @@ +package org.apache.s2graph.core + +import org.apache.s2graph.core.types.{InnerVal, InnerValLike} +import org.scalatest.{FunSuite, Matchers} + +class JsonParserTest extends FunSuite with Matchers with TestCommon with JSONParser { + + import InnerVal._ + import types.HBaseType._ + + val innerValsPerVersion = for { + version <- List(VERSION2, VERSION1) + } yield { + val innerVals = List( + (InnerVal.withStr("ABC123", version), STRING), + (InnerVal.withNumber(23, version), BYTE), + (InnerVal.withNumber(23, version), INT), + (InnerVal.withNumber(Int.MaxValue, version), INT), + (InnerVal.withNumber(Int.MinValue, version), INT), + (InnerVal.withNumber(Long.MaxValue, version), LONG), + (InnerVal.withNumber(Long.MinValue, version), LONG), + (InnerVal.withBoolean(true, version), BOOLEAN) + ) + val doubleVals = if (version == VERSION2) { + List( + (InnerVal.withDouble(Double.MaxValue, version), DOUBLE), + (InnerVal.withDouble(Double.MinValue, version), DOUBLE), + (InnerVal.withDouble(0.1, version), DOUBLE), + (InnerVal.withFloat(Float.MinValue, version), FLOAT), + (InnerVal.withFloat(Float.MaxValue, version), FLOAT), + (InnerVal.withFloat(0.9f, version), FLOAT) + ) + } else { + List.empty[(InnerValLike, String)] + } + (innerVals ++ doubleVals, version) + } + + def testInnerValToJsValue(innerValWithDataTypes: Seq[(InnerValLike, String)], + version: String) = { + for { + (innerVal, dataType) <- innerValWithDataTypes + } { + val jsValueOpt = innerValToJsValue(innerVal, dataType) + val decodedOpt = jsValueOpt.flatMap { jsValue => + jsValueToInnerVal(jsValue, dataType, version) + } + println(s"jsValue: $jsValueOpt") + println(s"innerVal: $decodedOpt") + (decodedOpt.isDefined && innerVal == decodedOpt.get) shouldBe true + } + } + + test("aa") { + val innerVal = InnerVal.withStr("abc", VERSION2) + val tmp = innerValToJsValue(innerVal, "string") + println(tmp) + } + test("JsValue <-> InnerVal with dataType") { + for { + (innerValWithDataTypes, version) <- innerValsPerVersion + } { + testInnerValToJsValue(innerValWithDataTypes, version) + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/OrderingUtilTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/OrderingUtilTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/OrderingUtilTest.scala new file mode 100644 index 0000000..e3f1ed5 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/OrderingUtilTest.scala @@ -0,0 +1,130 @@ +package org.apache.s2graph.core + +import org.apache.s2graph.core.OrderingUtil._ +import org.scalatest.{FunSuite, Matchers} +import play.api.libs.json.JsString + +class OrderingUtilTest extends FunSuite with Matchers { + test("test SeqMultiOrdering") { + val jsLs: Seq[Seq[Any]] = Seq( + Seq(0, "a"), + Seq(0, "b"), + Seq(1, "a"), + Seq(1, "b"), + Seq(2, "c") + ) + + // number descending, string ascending + val sortedJsLs: Seq[Seq[Any]] = Seq( + Seq(2, "c"), + Seq(1, "a"), + Seq(1, "b"), + Seq(0, "a"), + Seq(0, "b") + ) + + val ascendingLs: Seq[Boolean] = Seq(false, true) + val resultJsLs = jsLs.sorted(new SeqMultiOrdering[Any](ascendingLs)) + + resultJsLs.toString() should equal(sortedJsLs.toString()) + } + + test("test tuple 1 TupleMultiOrdering") { + val jsLs: Seq[(Any, Any, Any, Any)] = Seq( + (0, None, None, None), + (0, None, None, None), + (1, None, None, None), + (1, None, None, None), + (2, None, None, None) + ) + + val sortedJsLs: Seq[(Any, Any, Any, Any)] = Seq( + (2, None, None, None), + (1, None, None, None), + (1, None, None, None), + (0, None, None, None), + (0, None, None, None) + ) + + val ascendingLs: Seq[Boolean] = Seq(false) + val resultJsLs = jsLs.sorted(TupleMultiOrdering[Any](ascendingLs)) + + resultJsLs.toString() should equal(sortedJsLs.toString()) + } + + test("test tuple 2 TupleMultiOrdering") { + val jsLs: Seq[(Any, Any, Any, Any)] = Seq( + (0, "a", None, None), + (0, "b", None, None), + (1, "a", None, None), + (1, "b", None, None), + (2, "c", None, None) + ) + + // number descending, string ascending + val sortedJsLs: Seq[(Any, Any, Any, Any)] = Seq( + (2, "c", None, None), + (1, "a", None, None), + (1, "b", None, None), + (0, "a", None, None), + (0, "b", None, None) + ) + + val ascendingLs: Seq[Boolean] = Seq(false, true) + val resultJsLs = jsLs.sorted(TupleMultiOrdering[Any](ascendingLs)) + + resultJsLs.toString() should equal(sortedJsLs.toString()) + } + + test("test tuple 3 TupleMultiOrdering") { + val jsLs: Seq[(Any, Any, Any, Any)] = Seq( + (0, "a", 0l, None), + (0, "a", 1l, None), + (0, "b", 0l, None), + (1, "a", 0l, None), + (1, "b", 0l, None), + (2, "c", 0l, None) + ) + + val sortedJsLs: Seq[(Any, Any, Any, Any)] = Seq( + (0, "a", 1l, None), + (0, "a", 0l, None), + (0, "b", 0l, None), + (1, "a", 0l, None), + (1, "b", 0l, None), + (2, "c", 0l, None) + ) + + val ascendingLs: Seq[Boolean] = Seq(true, true, false) + val resultJsLs = jsLs.sorted(TupleMultiOrdering[Any](ascendingLs)) + + resultJsLs.toString() should equal(sortedJsLs.toString()) + } + + test("test tuple 4 TupleMultiOrdering") { + val jsLs: Seq[(Any, Any, Any, Any)] = Seq( + (0, "a", 0l, JsString("a")), + (0, "a", 0l, JsString("b")), + (0, "a", 1l, JsString("a")), + (0, "b", 0l, JsString("b")), + (1, "a", 0l, JsString("b")), + (1, "b", 0l, JsString("b")), + (2, "c", 0l, JsString("b")) + ) + + val sortedJsLs: Seq[(Any, Any, Any, Any)] = Seq( + (0, "a", 0l, JsString("b")), + (0, "a", 0l, JsString("a")), + (0, "a", 1l, JsString("a")), + (0, "b", 0l, JsString("b")), + (1, "a", 0l, JsString("b")), + (1, "b", 0l, JsString("b")), + (2, "c", 0l, JsString("b")) + ) + + val ascendingLs: Seq[Boolean] = Seq(true, true, true, false) + val resultJsLs = jsLs.sorted(TupleMultiOrdering[Any](ascendingLs)) + + resultJsLs.toString() should equal(sortedJsLs.toString()) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala new file mode 100644 index 0000000..5b324e1 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala @@ -0,0 +1,86 @@ +package org.apache.s2graph.core + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.types.LabelWithDirection +import org.scalatest.{FunSuite, Matchers} + +class QueryParamTest extends FunSuite with Matchers with TestCommon { +// val version = HBaseType.VERSION2 +// val testEdge = Management.toEdge(ts, "insert", "1", "10", labelNameV2, "out", Json.obj("is_blocked" -> true, "phone_number" -> "xxxx", "age" -> 20).toString) +// test("EdgeTransformer toInnerValOpt") { +// +// /** only labelNameV2 has string type output */ +// val jsVal = Json.arr(Json.arr("_to"), Json.arr("phone_number.$", "phone_number"), Json.arr("age.$", "age")) +// val transformer = EdgeTransformer(queryParamV2, jsVal) +// val convertedLs = transformer.transform(testEdge, None) +// +// convertedLs(0).tgtVertex.innerId.toString == "10" shouldBe true +// convertedLs(1).tgtVertex.innerId.toString == "phone_number.xxxx" shouldBe true +// convertedLs(2).tgtVertex.innerId.toString == "age.20" shouldBe true +// true +// } + + val dummyRequests = { + for { + id <- 0 until 1000 + } yield { + Bytes.toBytes(id) + } + } + + test("QueryParam toCacheKey bytes") { + val startedAt = System.nanoTime() + val queryParam = QueryParam(LabelWithDirection(1, 0)) + + for { + i <- dummyRequests.indices + x = queryParam.toCacheKey(dummyRequests(i)) + } { + for { + j <- dummyRequests.indices if i != j + y = queryParam.toCacheKey(dummyRequests(j)) + } { + x should not equal y + } + } + + dummyRequests.zip(dummyRequests).foreach { case (x, y) => + val xHash = queryParam.toCacheKey(x) + val yHash = queryParam.toCacheKey(y) +// println(xHash, yHash) + xHash should be(yHash) + } + val duration = System.nanoTime() - startedAt + + println(s">> bytes: $duration") + } + + test("QueryParam toCacheKey with variable params") { + val startedAt = System.nanoTime() + val queryParam = QueryParam(LabelWithDirection(1, 0)) + + dummyRequests.zip(dummyRequests).foreach { case (x, y) => + x shouldBe y + queryParam.limit(0, 10) + var xHash = queryParam.toCacheKey(x) + xHash shouldBe queryParam.toCacheKey(y) + queryParam.limit(1, 10) + var yHash = queryParam.toCacheKey(y) + queryParam.toCacheKey(x) shouldBe yHash +// println(xHash, yHash) + xHash should not be yHash + + queryParam.limit(0, 10) + xHash = queryParam.toCacheKey(x) + queryParam.limit(0, 11) + yHash = queryParam.toCacheKey(y) + + xHash should not be yHash + } + + val duration = System.nanoTime() - startedAt + + println(s">> diff: $duration") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala b/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala new file mode 100644 index 0000000..2c16972 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala @@ -0,0 +1,188 @@ +package org.apache.s2graph.core + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.mysqls.{LabelIndex, LabelMeta} +import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLikeWithTs, LabelWithDirection} + + +trait TestCommon { + val ts = System.currentTimeMillis() + val testServiceId = 1 + val testColumnId = 1 + val testLabelId = 1 + val testDir = GraphUtil.directions("out") + val testOp = GraphUtil.operations("insert") + val testLabelOrderSeq = LabelIndex.DefaultSeq + val testLabelWithDir = LabelWithDirection(testLabelId, testDir) + val labelMeta = LabelMeta + + def equalsTo(x: Array[Byte], y: Array[Byte]) = Bytes.compareTo(x, y) == 0 + + def largerThan(x: Array[Byte], y: Array[Byte]) = Bytes.compareTo(x, y) > 0 + + def lessThan(x: Array[Byte], y: Array[Byte]) = Bytes.compareTo(x, y) < 0 + + def lessThanEqual(x: Array[Byte], y: Array[Byte]) = Bytes.compareTo(x, y) <= 0 + + /** */ + import HBaseType.{VERSION1, VERSION2} + private val tsValSmall = InnerVal.withLong(ts, VERSION1) + private val tsValLarge = InnerVal.withLong(ts + 1, VERSION1) + private val boolValSmall = InnerVal.withBoolean(false, VERSION1) + private val boolValLarge = InnerVal.withBoolean(true, VERSION1) + private val doubleValSmall = InnerVal.withDouble(-0.1, VERSION1) + private val doubleValLarge = InnerVal.withDouble(0.1, VERSION1) + private val toSeq = LabelMeta.toSeq.toInt + private val toVal = InnerVal.withLong(Long.MinValue, VERSION1) + + + private val tsValSmallV2 = InnerVal.withLong(ts, VERSION2) + private val tsValLargeV2 = InnerVal.withLong(ts + 1, VERSION2) + private val boolValSmallV2 = InnerVal.withBoolean(false, VERSION2) + private val boolValLargeV2 = InnerVal.withBoolean(true, VERSION2) + private val doubleValSmallV2 = InnerVal.withDouble(-0.1, VERSION2) + private val doubleValLargeV2 = InnerVal.withDouble(0.1, VERSION2) + private val toValV2 = InnerVal.withLong(Long.MinValue, VERSION2) + + val intVals = (Int.MinValue until Int.MinValue + 10) ++ (-129 to -126) ++ (-1 to 1) ++ (126 to 129) ++ + (Int.MaxValue - 10 until Int.MaxValue) + val intInnerVals = intVals.map { v => InnerVal.withNumber(BigDecimal(v), VERSION1) } + + val intInnerValsV2 = intVals.map { v => InnerVal.withNumber(BigDecimal(v), VERSION2) } + + val stringVals = List("abc", "abd", "ac", "aca", "b") + val stringInnerVals = stringVals.map { s => InnerVal.withStr(s, VERSION1)} + val stringInnerValsV2 = stringVals.map { s => InnerVal.withStr(s, VERSION2)} + + val numVals = (Long.MinValue until Long.MinValue + 10).map(BigDecimal(_)) ++ + (Int.MinValue until Int.MinValue + 10).map(BigDecimal(_)) ++ + (Int.MaxValue - 10 until Int.MaxValue).map(BigDecimal(_)) ++ + (Long.MaxValue - 10 until Long.MaxValue).map(BigDecimal(_)) + val numInnerVals = numVals.map { n => InnerVal.withLong(n.toLong, VERSION1)} + val numInnerValsV2 = numVals.map { n => InnerVal.withNumber(n, VERSION2)} + + val doubleStep = Double.MaxValue / 5 + val doubleVals = (Double.MinValue until 0 by doubleStep).map(BigDecimal(_)) ++ + (-9999.9 until -9994.1 by 1.1).map(BigDecimal(_)) ++ + (-128.0 until 128.0 by 1.2).map(BigDecimal(_)) ++ + (129.0 until 142.0 by 1.1).map(BigDecimal(_)) ++ + (doubleStep until Double.MaxValue by doubleStep).map(BigDecimal(_)) + val doubleInnerVals = doubleVals.map { d => InnerVal.withDouble(d.toDouble, VERSION1)} + val doubleInnerValsV2 = doubleVals.map { d => InnerVal.withDouble(d.toDouble, VERSION2)} + + /** version 1 string order is broken */ + val idxPropsLs = Seq( + Seq((0 -> tsValSmall), (1 -> boolValSmall), (2 -> InnerVal.withStr("ac", VERSION1)),(toSeq -> toVal)), + Seq((0 -> tsValSmall), (1 -> boolValSmall), (2 -> InnerVal.withStr("ab", VERSION1)), (toSeq -> toVal)), + Seq((0 -> tsValSmall), (1 -> boolValSmall), (2-> InnerVal.withStr("b", VERSION1)), (toSeq -> toVal)), + Seq((0 -> tsValSmall), (1 -> boolValLarge), (2 -> InnerVal.withStr("b", VERSION1)), (toSeq -> toVal)), + Seq((0 -> tsValLarge), (1 -> boolValSmall), (2 -> InnerVal.withStr("a", VERSION1)), (toSeq -> toVal)) + ).map(seq => seq.map(t => t._1.toByte -> t._2 )) + + val idxPropsLsV2 = Seq( + Seq((0 -> tsValSmallV2), (1 -> boolValSmallV2), (2 -> InnerVal.withStr("a", VERSION2)), (3 -> doubleValSmallV2), (toSeq -> toValV2)), + Seq((0 -> tsValSmallV2), (1 -> boolValSmallV2), (2 -> InnerVal.withStr("a", VERSION2)), (3 -> doubleValLargeV2), (toSeq -> toValV2)), + Seq((0 -> tsValSmallV2), (1 -> boolValSmallV2), (2 -> InnerVal.withStr("ab", VERSION2)), (3 -> doubleValLargeV2), (toSeq -> toValV2)), + Seq((0 -> tsValSmallV2), (1 -> boolValSmallV2), (2-> InnerVal.withStr("b", VERSION2)), (3 ->doubleValLargeV2), (toSeq -> toValV2)), + Seq((0 -> tsValSmallV2), (1 -> boolValLargeV2), (2 -> InnerVal.withStr("a", VERSION2)), (3 ->doubleValLargeV2), (toSeq -> toValV2)), + Seq((0 -> tsValLargeV2), (1 -> boolValSmallV2), (2 -> InnerVal.withStr("a", VERSION2)), (3 ->doubleValLargeV2), (toSeq -> toValV2)) + ).map(seq => seq.map(t => t._1.toByte -> t._2 ) ) + + val idxPropsWithTsLs = idxPropsLs.map { idxProps => + idxProps.map { case (k, v) => k -> InnerValLikeWithTs(v, ts) } + } + val idxPropsWithTsLsV2 = idxPropsLsV2.map { idxProps => + idxProps.map { case (k, v) => k -> InnerValLikeWithTs(v, ts) } + } + // + // def testOrder(idxPropsLs: Seq[Seq[(Byte, InnerValLike)]], + // innerVals: Iterable[InnerValLike], skipHashBytes: Boolean = false) + // (createFunc: (Seq[(Byte, InnerValLike)], InnerValLike) => HBaseSerializable, + // fromBytesFunc: Array[Byte] => HBaseSerializable) = { + // /** check if increasing target vertex id is ordered properly with same indexProps */ + // val rets = for { + // idxProps <- idxPropsLs + // } yield { + // val head = createFunc(idxProps, innerVals.head) + // val start = head + // var prev = head + // val rets = for { + // innerVal <- innerVals.tail + // } yield { + // val current = createFunc(idxProps, innerVal) + // val bytes = current.bytes + // val decoded = fromBytesFunc(bytes) + // println(s"current: $current") + // println(s"decoded: $decoded") + // + // val prevBytes = if (skipHashBytes) prev.bytes.drop(GraphUtil.bytesForMurMurHash) else prev.bytes + // val currentBytes = if (skipHashBytes) bytes.drop(GraphUtil.bytesForMurMurHash) else bytes + // val (isSame, orderPreserved) = (current, decoded) match { + // case (c: v2.EdgeQualifier, d: v2.EdgeQualifier) if (idxProps.map(_._1).contains(toSeq)) => + // /** _to is used in indexProps */ + // (c.props.map(_._2) == d.props.map(_._2) && c.op == d.op, Bytes.compareTo(currentBytes, prevBytes) <= 0) + // case _ => + // (current == decoded, lessThan(currentBytes, prevBytes)) + // } + // + // println(s"$current ${bytes.toList}") + // println(s"$prev ${prev.bytes.toList}") + // println(s"SerDe[$isSame], Order[$orderPreserved]") + // prev = current + // isSame && orderPreserved + // } + // rets.forall(x => x) + // } + // rets.forall(x => x) + // } + // def testOrderReverse(idxPropsLs: Seq[Seq[(Byte, InnerValLike)]], innerVals: Iterable[InnerValLike], + // skipHashBytes: Boolean = false) + // (createFunc: (Seq[(Byte, InnerValLike)], InnerValLike) => HBaseSerializable, + // fromBytesFunc: Array[Byte] => HBaseSerializable) = { + // /** check if increasing target vertex id is ordered properly with same indexProps */ + // val rets = for { + // innerVal <- innerVals + // } yield { + // val head = createFunc(idxPropsLs.head, innerVal) + // val start = head + // var prev = head + // val rets = for { + // idxProps <- idxPropsLs.tail + // } yield { + // val current = createFunc(idxProps, innerVal) + // val bytes = current.bytes + // val decoded = fromBytesFunc(bytes) + // println(s"current: $current") + // println(s"decoded: $decoded") + // + // val prevBytes = if (skipHashBytes) prev.bytes.drop(GraphUtil.bytesForMurMurHash) else prev.bytes + // val currentBytes = if (skipHashBytes) bytes.drop(GraphUtil.bytesForMurMurHash) else bytes + // val (isSame, orderPreserved) = (current, decoded) match { + // case (c: v2.EdgeQualifier, d: v2.EdgeQualifier) if (idxProps.map(_._1).contains(toSeq)) => + // /** _to is used in indexProps */ + // (c.props.map(_._2) == d.props.map(_._2) && c.op == d.op, Bytes.compareTo(currentBytes, prevBytes) <= 0) + // case _ => + // (current == decoded, lessThan(currentBytes, prevBytes)) + // } + // + // println(s"$current ${bytes.toList}") + // println(s"$prev ${prev.bytes.toList}") + // println(s"SerDe[$isSame], Order[$orderPreserved]") + // prev = current + // isSame && orderPreserved + // } + // + // rets.forall(x => x) + // } + // + // rets.forall(x => x) + // } + // + // + // def putToKeyValues(put: PutRequest) = { + // val ts = put.timestamp() + // for ((q, v) <- put.qualifiers().zip(put.values)) yield { + // new KeyValue(put.key(), put.family(), q, ts, v) + // } + // } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala new file mode 100644 index 0000000..74b4197 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala @@ -0,0 +1,194 @@ +package org.apache.s2graph.core + +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} +import org.apache.s2graph.core.mysqls.{Label, LabelIndex, Service, ServiceColumn} +import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection} +import scalikejdbc.AutoSession + +import scala.concurrent.ExecutionContext + +trait TestCommonWithModels { + + import InnerVal._ + import types.HBaseType._ + + var graph: Graph = _ + var config: Config = _ + var management: Management = _ + + def initTests() = { + config = ConfigFactory.load() + graph = new Graph(config)(ExecutionContext.Implicits.global) + management = new Management(graph) + + implicit val session = AutoSession + + deleteTestLabel() + deleteTestService() + + createTestService() + createTestLabel() + } + + def zkQuorum = config.getString("hbase.zookeeper.quorum") + + def cluster = config.getString("hbase.zookeeper.quorum") + + implicit val session = AutoSession + + val serviceName = "_test_service" + val serviceNameV2 = "_test_service_v2" + val serviceNameV3 = "_test_service_v3" + val serviceNameV4 = "_test_service_v4" + + val columnName = "user_id" + val columnNameV2 = "user_id_v2" + val columnNameV3 = "user_id_v3" + val columnNameV4 = "user_id_v4" + + val columnType = "long" + val columnTypeV2 = "long" + val columnTypeV3 = "long" + val columnTypeV4 = "long" + + val tgtColumnName = "itme_id" + val tgtColumnNameV2 = "item_id_v2" + val tgtColumnNameV3 = "item_id_v3" + val tgtColumnNameV4 = "item_id_v4" + + val tgtColumnType = "string" + val tgtColumnTypeV2 = "string" + val tgtColumnTypeV3 = "string" + val tgtColumnTypeV4 = "string" + + val hTableName = "_test_cases" + val preSplitSize = 0 + + val labelName = "_test_label" + val labelNameV2 = "_test_label_v2" + val labelNameV3 = "_test_label_v3" + val labelNameV4 = "_test_label_v4" + + val undirectedLabelName = "_test_label_undirected" + val undirectedLabelNameV2 = "_test_label_undirected_v2" + + val testProps = Seq( + Prop("affinity_score", "0.0", DOUBLE), + Prop("is_blocked", "false", BOOLEAN), + Prop("time", "0", INT), + Prop("weight", "0", INT), + Prop("is_hidden", "true", BOOLEAN), + Prop("phone_number", "xxx-xxx-xxxx", STRING), + Prop("score", "0.1", FLOAT), + Prop("age", "10", INT) + ) + val testIdxProps = Seq(Index("_PK", Seq("_timestamp", "affinity_score"))) + val consistencyLevel = "strong" + val hTableTTL = None + + + def createTestService() = { + implicit val session = AutoSession + management.createService(serviceName, cluster, hTableName, preSplitSize, hTableTTL = None, "gz") + management.createService(serviceNameV2, cluster, hTableName, preSplitSize, hTableTTL = None, "gz") + management.createService(serviceNameV3, cluster, hTableName, preSplitSize, hTableTTL = None, "gz") + management.createService(serviceNameV4, cluster, hTableName, preSplitSize, hTableTTL = None, "gz") + } + + def deleteTestService() = { + implicit val session = AutoSession + Management.deleteService(serviceName) + Management.deleteService(serviceNameV2) + Management.deleteService(serviceNameV3) + Management.deleteService(serviceNameV4) + } + + def deleteTestLabel() = { + implicit val session = AutoSession + Management.deleteLabel(labelName) + Management.deleteLabel(labelNameV2) + Management.deleteLabel(undirectedLabelName) + Management.deleteLabel(undirectedLabelNameV2) + } + + def createTestLabel() = { + implicit val session = AutoSession + management.createLabel(labelName, serviceName, columnName, columnType, serviceName, columnName, columnType, + isDirected = true, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4") + + management.createLabel(labelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2, + isDirected = true, serviceNameV2, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4") + + management.createLabel(labelNameV3, serviceNameV3, columnNameV3, columnTypeV3, serviceNameV3, tgtColumnNameV3, tgtColumnTypeV3, + isDirected = true, serviceNameV3, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4") + + management.createLabel(labelNameV4, serviceNameV4, columnNameV4, columnTypeV4, serviceNameV4, tgtColumnNameV4, tgtColumnTypeV4, + isDirected = true, serviceNameV4, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION4, false, "lg4") + + management.createLabel(undirectedLabelName, serviceName, columnName, columnType, serviceName, tgtColumnName, tgtColumnType, + isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4") + + management.createLabel(undirectedLabelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2, + isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4") + } + + def service = Service.findByName(serviceName, useCache = false).get + + def serviceV2 = Service.findByName(serviceNameV2, useCache = false).get + + def serviceV3 = Service.findByName(serviceNameV3, useCache = false).get + + def serviceV4 = Service.findByName(serviceNameV4, useCache = false).get + + def column = ServiceColumn.find(service.id.get, columnName, useCache = false).get + + def columnV2 = ServiceColumn.find(serviceV2.id.get, columnNameV2, useCache = false).get + + def columnV3 = ServiceColumn.find(serviceV3.id.get, columnNameV3, useCache = false).get + + def columnV4 = ServiceColumn.find(serviceV4.id.get, columnNameV4, useCache = false).get + + def tgtColumn = ServiceColumn.find(service.id.get, tgtColumnName, useCache = false).get + + def tgtColumnV2 = ServiceColumn.find(serviceV2.id.get, tgtColumnNameV2, useCache = false).get + + def tgtColumnV3 = ServiceColumn.find(serviceV3.id.get, tgtColumnNameV3, useCache = false).get + + def tgtColumnV4 = ServiceColumn.find(serviceV4.id.get, tgtColumnNameV4, useCache = false).get + + def label = Label.findByName(labelName, useCache = false).get + + def labelV2 = Label.findByName(labelNameV2, useCache = false).get + + def labelV3 = Label.findByName(labelNameV3, useCache = false).get + + def labelV4 = Label.findByName(labelNameV4, useCache = false).get + + def undirectedLabel = Label.findByName(undirectedLabelName, useCache = false).get + + def undirectedLabelV2 = Label.findByName(undirectedLabelNameV2, useCache = false).get + + def dir = GraphUtil.directions("out") + + def op = GraphUtil.operations("insert") + + def labelOrderSeq = LabelIndex.DefaultSeq + + def labelWithDir = LabelWithDirection(label.id.get, dir) + + def labelWithDirV2 = LabelWithDirection(labelV2.id.get, dir) + + def labelWithDirV3 = LabelWithDirection(labelV3.id.get, dir) + + def labelWithDirV4 = LabelWithDirection(labelV4.id.get, dir) + + def queryParam = QueryParam(labelWithDir) + + def queryParamV2 = QueryParam(labelWithDirV2) + + def queryParamV3 = QueryParam(labelWithDirV3) + + def queryParamV4 = QueryParam(labelWithDirV4) + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala new file mode 100644 index 0000000..5b32a0f --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala @@ -0,0 +1,130 @@ +package org.apache.s2graph.core.models + +import org.apache.s2graph.core.TestCommonWithModels +import org.apache.s2graph.core.mysqls.Label +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} + +class ModelTest extends FunSuite with Matchers with TestCommonWithModels with BeforeAndAfterAll { + override def beforeAll(): Unit = { + initTests() + } + + override def afterAll(): Unit = { + graph.shutdown() + } + + // val serviceName = "testService" + // val newServiceName = "newTestService" + // val cluster = "localhost" + // val hbaseTableName = "s2graph-dev" + // val columnName = "user_id" + // val columnType = "long" + // val labelName = "model_test_label" + // val newLabelName = "new_model_test_label" + // val columnMetaName = "is_valid_user" + // val labelMetaName = "is_hidden" + // val hbaseTableTTL = -1 + // val id = 1 + // + // val service = HService(Map("id" -> id, "serviceName" -> serviceName, "cluster" -> cluster, + // "hbaseTableName" -> hbaseTableName, "preSplitSize" -> 0, "hbaseTableTTL" -> -1)) + // val serviceColumn = HServiceColumn(Map("id" -> id, "serviceId" -> service.id.get, + // "columnName" -> columnName, "columnType" -> columnType)) + // val columnMeta = HColumnMeta(Map("id" -> id, "columnId" -> serviceColumn.id.get, "name" -> columnMetaName, "seq" -> 1.toByte)) + // val label = HLabel(Map("id" -> id, "label" -> labelName, + // "srcServiceId" -> service.id.get, "srcColumnName" -> columnName, "srcColumnType" -> columnType, + // "tgtServiceId" -> service.id.get, "tgtColumnName" -> columnName, "tgtColumnType" -> columnType, + // "isDirected" -> true, "serviceName" -> service.serviceName, "serviceId" -> service.id.get, + // "consistencyLevel" -> "weak", "hTableName" -> hbaseTableName, "hTableTTL" -> -1 + // )) + // val labelMeta = HLabelMeta(Map("id" -> id, "labelId" -> label.id.get, "name" -> labelMetaName, "seq" -> 1.toByte, + // "defaultValue" -> false, "dataType" -> "boolean", "usedInIndex" -> false)) + // val labelIndex = HLabelIndex(Map("id" -> id, "labelId" -> label.id.get, "seq" -> 1.toByte, + // "metaSeqs" -> "0", "formular" -> "none")) + test("test Label.findByName") { + val labelOpt = Label.findByName(labelName, useCache = false) + println(labelOpt) + labelOpt.isDefined shouldBe true + val indices = labelOpt.get.indices + indices.size > 0 shouldBe true + println(indices) + val defaultIndexOpt = labelOpt.get.defaultIndex + println(defaultIndexOpt) + defaultIndexOpt.isDefined shouldBe true + val metas = labelOpt.get.metaProps + println(metas) + metas.size > 0 shouldBe true + val srcService = labelOpt.get.srcService + println(srcService) + val tgtService = labelOpt.get.tgtService + println(tgtService) + val service = labelOpt.get.service + println(service) + val srcColumn = labelOpt.get.srcService + println(srcColumn) + val tgtColumn = labelOpt.get.tgtService + println(tgtColumn) + } + // test("test create") { + // service.create() + // HService.findByName(serviceName, useCache = false) == Some(service) + // + // serviceColumn.create() + // HServiceColumn.findsByServiceId(service.id.get, useCache = false).headOption == Some(serviceColumn) + // + // columnMeta.create() + // HColumnMeta.findByName(serviceColumn.id.get, columnMetaName, useCache = false) == Some(columnMeta) + // + // label.create() + // HLabel.findByName(labelName, useCache = false) == Some(label) + // + // labelMeta.create() + // HLabelMeta.findByName(label.id.get, labelMetaName, useCache = false) == Some(labelMeta) + // + // labelIndex.create() + // HLabelIndex.findByLabelIdAll(label.id.get, useCache = false).headOption == Some(labelIndex) + // } + // + // test("test update") { + // service.update("cluster", "...") + // HService.findById(service.id.get, useCache = false).cluster == "..." + // + // service.update("serviceName", newServiceName) + // assert(HService.findByName(serviceName, useCache = false) == None) + // HService.findByName(newServiceName, useCache = false).map { service => service.id.get == service.id.get} + // + // label.update("label", newLabelName) + // HLabel.findById(label.id.get, useCache = false).label == "newLabelName" + // + // label.update("consistencyLevel", "strong") + // HLabel.findById(label.id.get, useCache = false).consistencyLevel == "strong" && + // HLabel.findByName(newLabelName).isDefined && + // HLabel.findByName(labelName) == None + // + // } + // test("test read by index") { + // val labels = HLabel.findBySrcServiceId(service.id.get, useCache = false) + // val idxs = HLabelIndex.findByLabelIdAll(label.id.get, useCache = false) + // labels.length == 1 && + // labels.head == label + // idxs.length == 1 && + // idxs.head == labelIndex + // } + // test("test delete") { + //// HLabel.findByName(labelName).foreach { label => + //// label.deleteAll() + //// } + // HLabel.findByName(newLabelName).foreach { label => + // label.deleteAll() + // } + // HLabelMeta.findAllByLabelId(label.id.get, useCache = false).isEmpty && + // HLabelIndex.findByLabelIdAll(label.id.get, useCache = false).isEmpty + // + // service.deleteAll() + // } + + // test("test labelIndex") { + // println(HLabelIndex.findByLabelIdAll(1)) + // } + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/mysqls/ExperimentSpec.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/mysqls/ExperimentSpec.scala b/s2core/src/test/scala/org/apache/s2graph/core/mysqls/ExperimentSpec.scala new file mode 100644 index 0000000..ccae6b6 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/mysqls/ExperimentSpec.scala @@ -0,0 +1,64 @@ +package org.apache.s2graph.core.mysqls + +import java.util.Properties + +import com.typesafe.config.ConfigFactory +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} +import scalikejdbc._ + +class ExperimentSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + val Ttl = 2 + override def beforeAll(): Unit = { + /* + maxSize = config.getInt("cache.max.size") + ttl = config.getInt("cache.ttl.seconds") + */ + val props = new Properties() + props.setProperty("cache.ttl.seconds", Ttl.toString) + Model.apply(ConfigFactory.load(ConfigFactory.parseProperties(props))) + + implicit val session = AutoSession + sql"""DELETE FROM buckets""".update().apply() + sql"""DELETE FROM experiments""".update().apply() + + val expId = sql"""INSERT INTO experiments(service_id, service_name, `name`, description) VALUES(1, "s1", "exp1", "")""".updateAndReturnGeneratedKey().apply() + sql"""INSERT INTO + buckets(experiment_id, modular, http_verb, api_path, request_body, impression_id) + VALUES($expId, "1~100", "POST", "/a/b/c", "None", "imp1")""".update().apply() + + } + + "Experiment" should "find bucket list" in { + Experiment.findBy(1, "exp1") should not be empty + + Experiment.findBy(1, "exp1").foreach { exp => + val bucket = exp.buckets.head + bucket.impressionId should equal("imp1") + } + } + + it should "update bucket list after cache ttl time" in { + Experiment.findBy(1, "exp1").foreach { exp => + val bucket = exp.buckets.head + bucket.impressionId should equal("imp1") + + implicit val session = AutoSession + + sql"""UPDATE buckets SET impression_id = "imp2" WHERE id = ${bucket.id}""".update().apply() + } + + // sleep ttl time + Thread.sleep((Ttl + 1) * 1000) + + // update experiment and bucket + Experiment.findBy(1, "exp1").foreach(exp => exp.buckets) + + // wait for cache updating + Thread.sleep(1 * 1000) + + // should be updated + Experiment.findBy(1, "exp1").foreach { exp => + exp.buckets.head.impressionId should equal("imp2") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala new file mode 100644 index 0000000..f251654 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala @@ -0,0 +1,234 @@ +package org.apache.s2graph.core.parsers + +import org.apache.s2graph.core._ +import org.apache.s2graph.core.mysqls.{Label, LabelMeta} +import org.apache.s2graph.core.rest.TemplateHelper +import org.apache.s2graph.core.types._ +import org.scalatest.{FunSuite, Matchers} +import play.api.libs.json.Json + +class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { + initTests() + + // dummy data for dummy edge + initTests() + + import HBaseType.{VERSION1, VERSION2} + + val ts = System.currentTimeMillis() + val dummyTs = (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion)) + + def ids(version: String) = { + val colId = if (version == VERSION2) columnV2.id.get else column.id.get + val srcId = SourceVertexId(colId, InnerVal.withLong(1, version)) + val tgtId = TargetVertexId(colId, InnerVal.withLong(2, version)) + + val srcIdStr = SourceVertexId(colId, InnerVal.withStr("abc", version)) + val tgtIdStr = TargetVertexId(colId, InnerVal.withStr("def", version)) + + val srcVertex = Vertex(srcId, ts) + val tgtVertex = Vertex(tgtId, ts) + val srcVertexStr = Vertex(srcIdStr, ts) + val tgtVertexStr = Vertex(tgtIdStr, ts) + (srcId, tgtId, srcIdStr, tgtIdStr, srcVertex, tgtVertex, srcVertexStr, tgtVertexStr, version) + } + + + def validate(label: Label)(edge: Edge)(sql: String)(expected: Boolean) = { + val whereOpt = WhereParser(label).parse(sql) + whereOpt.isSuccess shouldBe true + + println("=================================================================") + println(sql) + println(whereOpt.get) + + val ret = whereOpt.get.filter(edge) + if (ret != expected) { + println("==================") + println(s"$whereOpt") + println(s"$edge") + println("==================") + } + ret shouldBe expected + } + + test("check where clause not nested") { + for { + (srcId, tgtId, srcIdStr, tgtIdStr, srcVertex, tgtVertex, srcVertexStr, tgtVertexStr, schemaVer) <- List(ids(VERSION1), ids(VERSION2)) + } { + /** test for each version */ + val js = Json.obj("is_hidden" -> true, "is_blocked" -> false, "weight" -> 10, "time" -> 3, "name" -> "abc") + val propsInner = Management.toProps(label, js.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs + val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, propsInner) + val f = validate(label)(edge) _ + + /** labelName label is long-long relation */ + f(s"_to=${tgtVertex.innerId.toString}")(true) + + // currently this throw exception since label`s _to is long type. + f(s"_to=19230495")(false) + f(s"_to!=19230495")(true) + } + } + + test("check where clause nested") { + for { + (srcId, tgtId, srcIdStr, tgtIdStr, srcVertex, tgtVertex, srcVertexStr, tgtVertexStr, schemaVer) <- List(ids(VERSION1), ids(VERSION2)) + } { + /** test for each version */ + val js = Json.obj("is_hidden" -> true, "is_blocked" -> false, "weight" -> 10, "time" -> 3, "name" -> "abc") + val propsInner = Management.toProps(label, js.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs + val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, propsInner) + + val f = validate(label)(edge) _ + + // time == 3 + f("time >= 3")(true) + f("time > 2")(true) + f("time <= 3")(true) + f("time < 2")(false) + + f("(time in (1, 2, 3) and is_blocked = true) or is_hidden = false")(false) + f("(time in (1, 2, 3) or is_blocked = true) or is_hidden = false")(true) + f("(time in (1, 2, 3) and is_blocked = true) or is_hidden = true")(true) + f("(time in (1, 2, 3) or is_blocked = true) and is_hidden = true")(true) + + f("((time in ( 1, 2, 3) and weight between 1 and 10) or is_hidden=false)")(true) + f("(time in (1, 2, 4 ) or weight between 1 and 9) or (is_hidden = false)")(false) + f("(time in ( 1,2,4 ) or weight between 1 and 9) or is_hidden= true")(true) + f("(time in (1,2,3) or weight between 1 and 10) and is_hidden =false")(false) + } + } + + test("check where clause with from/to long") { + for { + (srcId, tgtId, srcIdStr, tgtIdStr, srcVertex, tgtVertex, srcVertexStr, tgtVertexStr, schemaVer) <- List(ids(VERSION1), ids(VERSION2)) + } { + /** test for each version */ + val js = Json.obj("is_hidden" -> true, "is_blocked" -> false, "weight" -> 10, "time" -> 3, "name" -> "abc") + val propsInner = Management.toProps(label, js.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs + val labelWithDirection = if (schemaVer == VERSION2) labelWithDirV2 else labelWithDir + val edge = Edge(srcVertex, tgtVertex, labelWithDirection, 0.toByte, ts, propsInner) + val lname = if (schemaVer == VERSION2) labelNameV2 else labelName + val f = validate(label)(edge) _ + + f(s"_from = -1 or _to = ${tgtVertex.innerId.value}")(true) + f(s"_from = ${srcVertex.innerId.value} and _to = ${tgtVertex.innerId.value}")(true) + f(s"_from = ${tgtVertex.innerId.value} and _to = 102934")(false) + f(s"_from = -1")(false) + f(s"_from in (-1, -0.1)")(false) + } + } + + + test("check where clause with parent") { + for { + (srcId, tgtId, srcIdStr, tgtIdStr, srcVertex, tgtVertex, srcVertexStr, tgtVertexStr, schemaVer) <- List(ids(VERSION1), ids(VERSION2)) + } { + /** test for each version */ + val js = Json.obj("is_hidden" -> true, "is_blocked" -> false, "weight" -> 10, "time" -> 1, "name" -> "abc") + val parentJs = Json.obj("is_hidden" -> false, "is_blocked" -> false, "weight" -> 20, "time" -> 3, "name" -> "a") + + val propsInner = Management.toProps(label, js.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs + val parentPropsInner = Management.toProps(label, parentJs.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs + + val grandParentEdge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, parentPropsInner) + val parentEdge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, parentPropsInner, + parentEdges = Seq(EdgeWithScore(grandParentEdge, 1.0))) + val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, propsInner, + parentEdges = Seq(EdgeWithScore(parentEdge, 1.0))) + + println(edge.toString) + println(parentEdge.toString) + println(grandParentEdge.toString) + + val f = validate(label)(edge) _ + + // Compare edge's prop(`_from`) with edge's prop(`name`) + f("_from = 1")(true) + f("_to = 2")(true) + f("_from = 123")(false) + f("_from = time")(true) + + // Compare edge's prop(`weight`) with edge's prop(`time`) + f("weight = time")(false) + f("weight = is_blocked")(false) + + // Compare edge's prop(`weight`) with parent edge's prop(`weight`) + f("_parent.is_blocked = is_blocked")(true) + f("is_hidden = _parent.is_hidden")(false) + f("_parent.weight = weight")(false) + + // Compare edge's prop(`is_hidden`) with parent of parent edge's prop(`is_hidden`) + f("_parent._parent.is_hidden = is_hidden")(false) + f("_parent._parent.is_blocked = is_blocked")(true) + f("_parent._parent.weight = weight")(false) + f("_parent._parent.weight = _parent.weight")(true) + } + } + + test("replace reserved") { + val ts = 0 + import TemplateHelper._ + + calculate(ts, 1, "hour") should be(hour + ts) + calculate(ts, 1, "day") should be(day + ts) + + calculate(ts + 10, 1, "HOUR") should be(hour + ts + 10) + calculate(ts + 10, 1, "DAY") should be(day + ts + 10) + + val body = """{ + "day": ${1day}, + "hour": ${1hour}, + "-day": "${-10 day}", + "-hour": ${-10 hour}, + "now": "${now}" + } + """ + + val parsed = replaceVariable(ts, body) + val json = Json.parse(parsed) + + (json \ "day").as[Long] should be (1 * day + ts) + (json \ "hour").as[Long] should be (1 * hour + ts) + + (json \ "-day").as[Long] should be (-10 * day + ts) + (json \ "-hour").as[Long] should be (-10 * hour + ts) + + (json \ "now").as[Long] should be (ts) + + val otherBody = """{ + "nextday": "${next_day}", + "3dayago": "${next_day - 3 day}", + "nexthour": "${next_hour}" + }""" + + val currentTs = System.currentTimeMillis() + val expectedDayTs = currentTs / day * day + day + val expectedHourTs = currentTs / hour * hour + hour + val threeDayAgo = expectedDayTs - 3 * day + val currentTsLs = (1 until 1000).map(currentTs + _) + + currentTsLs.foreach { ts => + val parsed = replaceVariable(ts, otherBody) + val json = Json.parse(parsed) + + (json \ "nextday").as[Long] should be(expectedDayTs) + (json \ "nexthour").as[Long] should be(expectedHourTs) + (json \ "3dayago").as[Long] should be(threeDayAgo) + } + } + + // test("time decay") { + // val ts = System.currentTimeMillis() + // + // for { + // i <- (0 until 10) + // } { + // val timeUnit = 60 * 60 + // val diff = i * timeUnit + // val x = TimeDecay(1.0, 0.05, timeUnit) + // println(x.decay(diff)) + // } + // } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageTest.scala new file mode 100644 index 0000000..7b601b6 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageTest.scala @@ -0,0 +1,33 @@ +package org.apache.s2graph.core.storage.hbase + +import org.scalatest.{FunSuite, Matchers} + +class AsynchbaseStorageTest extends FunSuite with Matchers { + + /** need secured cluster */ +// test("test secure cluster connection") { +// val config = ConfigFactory.parseMap( +// Map( +// "hbase.zookeeper.quorum" -> "localhost", +// "hbase.security.auth.enable" -> "true", +// "hbase.security.authentication" -> "kerberos", +// "hbase.kerberos.regionserver.principal" -> "hbase/[email protected]", +// "hbase.sasl.clientconfig" -> "Client", +// "java.security.krb5.conf" -> "krb5.conf", +// "java.security.auth.login.config" -> "async-client.jaas.conf") +// ) +// +// val client = AsynchbaseStorage.makeClient(config) +// val table = "test".getBytes() +// +// val putRequest = new PutRequest(table, "a".getBytes(), "e".getBytes, "a".getBytes, "a".getBytes) +// val getRequest = new GetRequest(table, "a".getBytes(), "e".getBytes) +// val ret = client.put(putRequest).join() +// val kvs = client.get(getRequest).join() +// for { +// kv <- kvs +// } { +// println(kv.toString) +// } +// } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala new file mode 100644 index 0000000..84e7458 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala @@ -0,0 +1,81 @@ +package org.apache.s2graph.core.storage.hbase + +import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} +import org.apache.s2graph.core.types._ +import org.apache.s2graph.core.{IndexEdge, TestCommonWithModels, Vertex} +import org.scalatest.{FunSuite, Matchers} + + +class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { + initTests() + + /** + * check if storage serializer/deserializer can translate from/to bytes array. + * @param l: label for edge. + * @param ts: timestamp for edge. + * @param to: to VertexId for edge. + * @param props: expected props of edge. + */ + def check(l: Label, ts: Long, to: InnerValLike, props: Map[Byte, InnerValLike]): Unit = { + val from = InnerVal.withLong(1, l.schemaVersion) + val vertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, from) + val tgtVertexId = TargetVertexId(HBaseType.DEFAULT_COL_ID, to) + val vertex = Vertex(vertexId, ts) + val tgtVertex = Vertex(tgtVertexId, ts) + val labelWithDir = LabelWithDirection(l.id.get, 0) + + val indexEdge = IndexEdge(vertex, tgtVertex, labelWithDir, 0, ts, LabelIndex.DefaultSeq, props) + val _indexEdgeOpt = graph.storage.indexEdgeDeserializer(l.schemaVersion).fromKeyValues(queryParam, + graph.storage.indexEdgeSerializer(indexEdge).toKeyValues, l.schemaVersion, None) + + _indexEdgeOpt should not be empty + indexEdge should be(_indexEdgeOpt.get) + } + + + /** note that props have to be properly set up for equals */ + test("test serializer/deserializer for index edge.") { + val ts = System.currentTimeMillis() + for { + l <- Seq(label, labelV2, labelV3, labelV4) + } { + val to = InnerVal.withLong(101, l.schemaVersion) + val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion) + val props = Map(LabelMeta.timeStampSeq -> tsInnerVal, + 1.toByte -> InnerVal.withDouble(2.1, l.schemaVersion)) + + check(l, ts, to, props) + } + } + + test("test serializer/deserializer for degree edge.") { + val ts = System.currentTimeMillis() + for { + l <- Seq(label, labelV2, labelV3, labelV4) + } { + val to = InnerVal.withStr("0", l.schemaVersion) + val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion) + val props = Map( + LabelMeta.degreeSeq -> InnerVal.withLong(10, l.schemaVersion), + LabelMeta.timeStampSeq -> tsInnerVal) + + check(l, ts, to, props) + } + } + + test("test serializer/deserializer for incrementCount index edge.") { + val ts = System.currentTimeMillis() + for { + l <- Seq(label, labelV2, labelV3, labelV4) + } { + val to = InnerVal.withLong(101, l.schemaVersion) + + val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion) + val props = Map(LabelMeta.timeStampSeq -> tsInnerVal, + 1.toByte -> InnerVal.withDouble(2.1, l.schemaVersion), + LabelMeta.countSeq -> InnerVal.withLong(10, l.schemaVersion)) + + check(l, ts, to, props) + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/types/InnerValTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/types/InnerValTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/types/InnerValTest.scala new file mode 100644 index 0000000..607f46d --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/types/InnerValTest.scala @@ -0,0 +1,128 @@ +package org.apache.s2graph.core.types + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.TestCommonWithModels +import org.scalatest.{FunSuite, Matchers} + +class InnerValTest extends FunSuite with Matchers with TestCommonWithModels { + initTests() + + import HBaseType.VERSION2 + val decimals = List( + BigDecimal(Long.MinValue), + BigDecimal(Int.MinValue), + BigDecimal(Double.MinValue), + BigDecimal(Float.MinValue), + BigDecimal(Short.MinValue), + BigDecimal(Byte.MinValue), + + BigDecimal(-1), + BigDecimal(0), + BigDecimal(1), + BigDecimal(Long.MaxValue), + BigDecimal(Int.MaxValue), + BigDecimal(Double.MaxValue), + BigDecimal(Float.MaxValue), + BigDecimal(Short.MaxValue), + BigDecimal(Byte.MaxValue) + ) + val booleans = List( + false, true + ) + val strings = List( + "abc", "abd", "ac", "aca" + ) + val texts = List( + (0 until 1000).map(x => "a").mkString + ) + val blobs = List( + (0 until 1000).map(x => Byte.MaxValue).toArray + ) + def testEncodeDecode(ranges: List[InnerValLike], version: String) = { + for { + innerVal <- ranges + } { + val bytes = innerVal.bytes + val (decoded, numOfBytesUsed) = InnerVal.fromBytes(bytes, 0, bytes.length, version) + innerVal == decoded shouldBe true + bytes.length == numOfBytesUsed shouldBe true + } + } + // test("big decimal") { + // for { + // version <- List(VERSION2, VERSION1) + // } { + // val innerVals = decimals.map { num => InnerVal.withNumber(num, version)} + // testEncodeDecode(innerVals, version) + // } + // } + // test("text") { + // for { + // version <- List(VERSION2) + // } { + // val innerVals = texts.map { t => InnerVal.withStr(t, version) } + // testEncodeDecode(innerVals, version) + // } + // } + // test("string") { + // for { + // version <- List(VERSION2, VERSION1) + // } { + // val innerVals = strings.map { t => InnerVal.withStr(t, version) } + // testEncodeDecode(innerVals, version) + // } + // } + // test("blob") { + // for { + // version <- List(VERSION2) + // } { + // val innerVals = blobs.map { t => InnerVal.withBlob(t, version) } + // testEncodeDecode(innerVals, version) + // } + // } + // test("boolean") { + // for { + // version <- List(VERSION2, VERSION1) + // } { + // val innerVals = booleans.map { t => InnerVal.withBoolean(t, version) } + // testEncodeDecode(innerVals, version) + // } + // } + test("korean") { + val small = InnerVal.withStr("ê°", VERSION2) + val large = InnerVal.withStr("ë", VERSION2) + val smallBytes = small.bytes + val largeBytes = large.bytes + + println (Bytes.compareTo(smallBytes, largeBytes)) + true + } + // test("innerVal") { + // val srcVal = InnerVal.withLong(44391298, VERSION2) + // val srcValV1 = InnerVal.withLong(44391298, VERSION1) + // val tgtVal = InnerVal.withLong(7295564, VERSION2) + // + // val a = VertexId(0, srcVal) + // val b = SourceVertexId(0, srcVal) + // val c = TargetVertexId(0, srcVal) + // val aa = VertexId(0, srcValV1) + // val bb = SourceVertexId(0, srcValV1) + // val cc = TargetVertexId(0, srcValV1) + // println(a.bytes.toList) + // println(b.bytes.toList) + // println(c.bytes.toList) + // + // println(aa.bytes.toList) + // println(bb.bytes.toList) + // println(cc.bytes.toList) + // } + // test("aa") { + // val bytes = InnerVal.withLong(Int.MaxValue, VERSION2).bytes + // val pbr = new SimplePositionedByteRange(bytes) + // pbr.setOffset(1) + // println(pbr.getPosition) + // val num = OrderedBytes.decodeNumericAsBigDecimal(pbr) + // println(pbr.getPosition) + // true + // } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/TrxLog.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/TrxLog.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/TrxLog.scala new file mode 100644 index 0000000..530d775 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/TrxLog.scala @@ -0,0 +1,25 @@ +package org.apache.s2graph.counter + +import play.api.libs.json.Json + +// item1 -> likedCount -> month:2015-10, 1 +// edge + // policyId = Label.findByName(likedCount).id.get + // item = edge.srcVertexId + // results = +case class TrxLog(success: Boolean, policyId: Int, item: String, results: Iterable[TrxLogResult]) + +// interval = m, ts = 2015-10, "age.gender.20.M", 1, 2 +case class TrxLogResult(interval: String, ts: Long, dimension: String, value: Long, result: Long = -1) + +object TrxLogResult { + implicit val writes = Json.writes[TrxLogResult] + implicit val reads = Json.reads[TrxLogResult] + implicit val formats = Json.format[TrxLogResult] +} + +object TrxLog { + implicit val writes = Json.writes[TrxLog] + implicit val reads = Json.reads[TrxLog] + implicit val formats = Json.format[TrxLog] +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/config/ConfigFunctions.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/config/ConfigFunctions.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/config/ConfigFunctions.scala new file mode 100644 index 0000000..219568b --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/config/ConfigFunctions.scala @@ -0,0 +1,30 @@ +package org.apache.s2graph.counter.config + +import com.typesafe.config.Config + +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag + +abstract class ConfigFunctions(conf: Config) { + def getOrElse[T: ClassTag](key: String, default: T): T = { + val ret = if (conf.hasPath(key)) (default match { + case _: String => conf.getString(key) + case _: Int | _: Integer => conf.getInt(key) + case _: Float | _: Double => conf.getDouble(key) + case _: Boolean => conf.getBoolean(key) + case _ => default + }).asInstanceOf[T] + else default + println(s"${this.getClass.getName}: $key -> $ret") + ret + } + + def getConfigMap(path: String): Map[String, String] = { + conf.getConfig(path).entrySet().map { entry => + val key = s"$path.${entry.getKey}" + val value = conf.getString(key) + println(s"${this.getClass.getName}: $key -> $value") + key -> value + }.toMap + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/config/S2CounterConfig.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/config/S2CounterConfig.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/config/S2CounterConfig.scala new file mode 100644 index 0000000..c9e1e38 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/config/S2CounterConfig.scala @@ -0,0 +1,44 @@ +package org.apache.s2graph.counter.config + +import com.typesafe.config.Config + +import scala.collection.JavaConversions._ + +class S2CounterConfig(config: Config) extends ConfigFunctions(config) { + // HBase + lazy val HBASE_ZOOKEEPER_QUORUM = getOrElse("hbase.zookeeper.quorum", "") + lazy val HBASE_TABLE_NAME = getOrElse("hbase.table.name", "s2counter") + lazy val HBASE_TABLE_POOL_SIZE = getOrElse("hbase.table.pool.size", 100) + lazy val HBASE_CONNECTION_POOL_SIZE = getOrElse("hbase.connection.pool.size", 10) + + lazy val HBASE_CLIENT_IPC_POOL_SIZE = getOrElse("hbase.client.ipc.pool.size", 5) + lazy val HBASE_CLIENT_MAX_TOTAL_TASKS = getOrElse("hbase.client.max.total.tasks", 100) + lazy val HBASE_CLIENT_MAX_PERSERVER_TASKS = getOrElse("hbase.client.max.perserver.tasks", 5) + lazy val HBASE_CLIENT_MAX_PERREGION_TASKS = getOrElse("hbase.client.max.perregion.tasks", 1) + lazy val HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD = getOrElse("hbase.client.scanner.timeout.period", 300) + lazy val HBASE_CLIENT_OPERATION_TIMEOUT = getOrElse("hbase.client.operation.timeout", 100) + lazy val HBASE_CLIENT_RETRIES_NUMBER = getOrElse("hbase.client.retries.number", 1) + + // MySQL + lazy val DB_DEFAULT_DRIVER = getOrElse("db.default.driver", "com.mysql.jdbc.Driver") + lazy val DB_DEFAULT_URL = getOrElse("db.default.url", "") + lazy val DB_DEFAULT_USER = getOrElse("db.default.user", "graph") + lazy val DB_DEFAULT_PASSWORD = getOrElse("db.default.password", "graph") + + // Redis + lazy val REDIS_INSTANCES = (for { + s <- config.getStringList("redis.instances") + } yield { + val sp = s.split(':') + (sp(0), if (sp.length > 1) sp(1).toInt else 6379) + }).toList + + // Graph + lazy val GRAPH_URL = getOrElse("s2graph.url", "http://localhost:9000") + lazy val GRAPH_READONLY_URL = getOrElse("s2graph.read-only.url", GRAPH_URL) + + // Cache + lazy val CACHE_TTL_SECONDS = getOrElse("cache.ttl.seconds", 600) + lazy val CACHE_MAX_SIZE = getOrElse("cache.max.size", 10000) + lazy val CACHE_NEGATIVE_TTL_SECONDS = getOrElse("cache.negative.ttl.seconds", CACHE_TTL_SECONDS) + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/BytesUtil.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/BytesUtil.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/BytesUtil.scala new file mode 100644 index 0000000..0f48b0b --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/BytesUtil.scala @@ -0,0 +1,13 @@ +package org.apache.s2graph.counter.core + + +trait BytesUtil { + def getRowKeyPrefix(id: Int): Array[Byte] + + def toBytes(key: ExactKeyTrait): Array[Byte] + def toBytes(eq: ExactQualifier): Array[Byte] + def toBytes(tq: TimedQualifier): Array[Byte] + + def toExactQualifier(bytes: Array[Byte]): ExactQualifier + def toTimedQualifier(bytes: Array[Byte]): TimedQualifier +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactCounter.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactCounter.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactCounter.scala new file mode 100644 index 0000000..a7e40ae --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactCounter.scala @@ -0,0 +1,243 @@ +package org.apache.s2graph.counter.core + +import com.typesafe.config.Config +import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit +import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit.IntervalUnit +import org.apache.s2graph.counter.{TrxLogResult, TrxLog} +import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit.IntervalUnit +import org.apache.s2graph.counter.decay.ExpDecayFormula +import org.apache.s2graph.counter.models.Counter +import org.apache.s2graph.counter.util.{FunctionParser, CollectionCacheConfig, CollectionCache} +import org.slf4j.LoggerFactory +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} + + + +case class ExactCounterRow(key: ExactKeyTrait, value: Map[ExactQualifier, Long]) +case class FetchedCounts(exactKey: ExactKeyTrait, qualifierWithCountMap: Map[ExactQualifier, Long]) +case class DecayedCounts(exactKey: ExactKeyTrait, qualifierWithCountMap: Map[ExactQualifier, Double]) +case class FetchedCountsGrouped(exactKey: ExactKeyTrait, intervalWithCountMap: Map[(IntervalUnit, Map[String, String]), Map[ExactQualifier, Long]]) + +class ExactCounter(config: Config, storage: ExactStorage) { + import ExactCounter._ + + val syncDuration = Duration(10, SECONDS) + private val log = LoggerFactory.getLogger(getClass) + + val storageStatusCache = new CollectionCache[Option[Boolean]](CollectionCacheConfig(1000, 60, negativeCache = false, 60)) + + // dimension: age, value of ages + def getCountAsync(policy: Counter, + itemId: String, + intervals: Seq[IntervalUnit], + from: Long, + to: Long, + dimension: Map[String, Set[String]]) + (implicit ex: ExecutionContext): Future[Option[FetchedCountsGrouped]] = { + for { + fetchedCounts <- getCountsAsync(policy, Seq(itemId), + intervals.map(interval => (TimedQualifier(interval, from), TimedQualifier(interval, to))), dimension) + } yield { + fetchedCounts.headOption + } + } + + // multi item, time range and multi dimension + def getCountsAsync(policy: Counter, + items: Seq[String], + timeRange: Seq[(TimedQualifier, TimedQualifier)], + dimQuery: Map[String, Set[String]]) + (implicit ex: ExecutionContext): Future[Seq[FetchedCountsGrouped]] = { + storage.get(policy, items, timeRange, dimQuery) + } + + def getCount(policy: Counter, + itemId: String, + intervals: Seq[IntervalUnit], + from: Long, + to: Long, + dimension: Map[String, Set[String]]) + (implicit ex: ExecutionContext): Option[FetchedCountsGrouped] = { + getCounts(policy, Seq(itemId), + intervals.map(interval => (TimedQualifier(interval, from), TimedQualifier(interval, to))), dimension).headOption + } + + def getCount(policy: Counter, + itemId: String, + intervals: Seq[IntervalUnit], + from: Long, + to: Long) + (implicit ex: ExecutionContext): Option[FetchedCounts] = { + val future = storage.get(policy, + Seq(itemId), + intervals.map(interval => (TimedQualifier(interval, from), TimedQualifier(interval, to)))) + Await.result(future, syncDuration).headOption + } + + // multi item, time range and multi dimension + def getCounts(policy: Counter, + items: Seq[String], + timeRange: Seq[(TimedQualifier, TimedQualifier)], + dimQuery: Map[String, Set[String]]) + (implicit ex: ExecutionContext): Seq[FetchedCountsGrouped] = { + Await.result(storage.get(policy, items, timeRange, dimQuery), syncDuration) + } + + def getRelatedCounts(policy: Counter, keyWithQualifiers: Seq[(String, Seq[ExactQualifier])]) + (implicit ex: ExecutionContext): Map[String, Map[ExactQualifier, Long]] = { + val queryKeyWithQualifiers = { + for { + (itemKey, qualifiers) <- keyWithQualifiers + } yield { + val relKey = ExactKey(policy.id, policy.version, policy.itemType, itemKey) + (relKey, qualifiers) + } + } + val future = storage.get(policy, queryKeyWithQualifiers) + + for { + FetchedCounts(exactKey, exactQualifierToLong) <- Await.result(future, syncDuration) + } yield { + exactKey.itemKey -> exactQualifierToLong + } + }.toMap + + def getPastCounts(policy: Counter, keyWithQualifiers: Seq[(String, Seq[ExactQualifier])]) + (implicit ex: ExecutionContext): Map[String, Map[ExactQualifier, Long]] = { + // query paste count + val queryKeyWithQualifiers = { + for { + (itemKey, qualifiers) <- keyWithQualifiers + } yield { + val relKey = ExactKey(policy.id, policy.version, policy.itemType, itemKey) + (relKey, qualifiers.map(eq => eq.copy(tq = eq.tq.add(-1)))) + } + } + val future = storage.get(policy, queryKeyWithQualifiers) + + for { + FetchedCounts(exactKey, exactQualifierToLong) <- Await.result(future, syncDuration) + } yield { + // restore tq + exactKey.itemKey -> exactQualifierToLong.map { case (eq, v) => + eq.copy(tq = eq.tq.add(1)) -> v + } + } + }.toMap + + def getDecayedCountsAsync(policy: Counter, + items: Seq[String], + timeRange: Seq[(TimedQualifier, TimedQualifier)], + dimQuery: Map[String, Set[String]], + qsSum: Option[String])(implicit ex: ExecutionContext): Future[Seq[DecayedCounts]] = { + val groupedTimeRange = timeRange.groupBy(_._1.q) + getCountsAsync(policy, items, timeRange, dimQuery).map { seq => + for { + FetchedCountsGrouped(k, intervalWithCountMap) <- seq + } yield { + DecayedCounts(k, { + for { + ((interval, dimKeyValues), grouped) <- intervalWithCountMap + } yield { + val (tqFrom, tqTo) = groupedTimeRange(interval).head + val formula = { + for { + strSum <- qsSum + (func, arg) <- FunctionParser(strSum) + } yield { + // apply function + func.toLowerCase match { + case "exp_decay" => ExpDecayFormula.byMeanLifeTime(arg.toLong * TimedQualifier.getTsUnit(interval)) + case _ => throw new UnsupportedOperationException(s"unknown function: $strSum") + } + } + } + ExactQualifier(tqFrom, dimKeyValues) -> { + grouped.map { case (eq, count) => + formula match { + case Some(decay) => + decay(count, tqTo.ts - eq.tq.ts) + case None => + count + } + }.sum + } + } + }) + } + } + } + + def updateCount(policy: Counter, counts: Seq[(ExactKeyTrait, ExactValueMap)]): Seq[TrxLog] = { + ready(policy) match { + case true => + val updateResults = storage.update(policy, counts) + for { + (exactKey, values) <- counts + results = updateResults.getOrElse(exactKey, Nil.toMap) + } yield { + TrxLog(results.nonEmpty, exactKey.policyId, exactKey.itemKey, makeTrxLogResult(values, results)) + } + case false => + Nil + } + } + + def deleteCount(policy: Counter, keys: Seq[ExactKeyTrait]): Unit = { + storage.delete(policy, keys) + } + + private def makeTrxLogResult(values: ExactValueMap, results: ExactValueMap): Seq[TrxLogResult] = { + for { + (eq, value) <- values + } yield { + val result = results.getOrElse(eq, -1l) + TrxLogResult(eq.tq.q.toString, eq.tq.ts, eq.dimension, value, result) + } + }.toSeq + + def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): Seq[Boolean] = { + storage.insertBlobValue(policy, keys) + } + + def getBlobValue(policy: Counter, blobId: String): Option[String] = { + storage.getBlobValue(policy, blobId) + } + + def prepare(policy: Counter) = { + storage.prepare(policy) + } + + def destroy(policy: Counter) = { + storage.destroy(policy) + } + + def ready(policy: Counter): Boolean = { + storageStatusCache.withCache(s"${policy.id}") { + val ready = storage.ready(policy) + if (!ready) { + // if key is not in cache, log message + log.warn(s"${policy.service}.${policy.action} storage is not ready.") + } + Some(ready) + }.getOrElse(false) + } +} + +object ExactCounter { + object ColumnFamily extends Enumeration { + type ColumnFamily = Value + + val SHORT = Value("s") + val LONG = Value("l") + } + import IntervalUnit._ + val intervalsMap = Map(MINUTELY -> ColumnFamily.SHORT, HOURLY -> ColumnFamily.SHORT, + DAILY -> ColumnFamily.LONG, MONTHLY -> ColumnFamily.LONG, TOTAL -> ColumnFamily.LONG) + + val blobCF = ColumnFamily.LONG.toString.getBytes + val blobColumn = "b".getBytes + + type ExactValueMap = Map[ExactQualifier, Long] +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactKey.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactKey.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactKey.scala new file mode 100644 index 0000000..b472649 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactKey.scala @@ -0,0 +1,26 @@ +package org.apache.s2graph.counter.core + +import org.apache.s2graph.counter.models.Counter +import org.apache.s2graph.counter.models.Counter.ItemType +import org.apache.s2graph.counter.models.Counter.ItemType.ItemType +import org.apache.s2graph.counter.util.Hashes + +trait ExactKeyTrait { + def policyId: Int + def version: Byte + def itemType: ItemType + def itemKey: String +} + +case class ExactKey(policyId: Int, version: Byte, itemType: ItemType, itemKey: String) extends ExactKeyTrait +case class BlobExactKey(policyId: Int, version: Byte, itemType: ItemType, itemKey: String, itemId: String) extends ExactKeyTrait + +object ExactKey { + def apply(policy: Counter, itemId: String, checkItemType: Boolean): ExactKeyTrait = { + if (checkItemType && policy.itemType == ItemType.BLOB) { + BlobExactKey(policy.id, policy.version, ItemType.BLOB, Hashes.sha1(itemId), itemId) + } else { + ExactKey(policy.id, policy.version, policy.itemType, itemId) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactQualifier.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactQualifier.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactQualifier.scala new file mode 100644 index 0000000..96104bf --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactQualifier.scala @@ -0,0 +1,73 @@ +package org.apache.s2graph.counter.core + +import java.util +import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} +import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit.IntervalUnit +import scala.collection.JavaConversions._ + +case class ExactQualifier(tq: TimedQualifier, dimKeyValues: Map[String, String], dimension: String) { + def checkDimensionEquality(dimQuery: Map[String, Set[String]]): Boolean = { +// println(s"self: $dimKeyValues, query: $dimQuery") + dimQuery.size == dimKeyValues.size && { + for { + (k, v) <- dimKeyValues + } yield { + dimQuery.get(k).exists(qv => qv.isEmpty || qv.contains(v)) + } + }.forall(x => x) + } +} + +object ExactQualifier { + val cache: LoadingCache[String, Map[String, String]] = CacheBuilder.newBuilder() + .maximumSize(10000) + .build( + new CacheLoader[String, Map[String, String]]() { + def load(s: String): Map[String, String] = { + strToDimensionMap(s) + } + } + ) + + def apply(tq: TimedQualifier, dimension: String): ExactQualifier = { + ExactQualifier(tq, cache.get(dimension), dimension) + } + + def apply(tq: TimedQualifier, dimKeyValues: Map[String, String]): ExactQualifier = { + ExactQualifier(tq, dimKeyValues, makeDimensionStr(dimKeyValues)) + } + + def makeSortedDimension(dimKeyValues: Map[String, String]): Iterator[String] = { + val sortedDimKeyValues = new util.TreeMap[String, String](dimKeyValues) + sortedDimKeyValues.keysIterator ++ sortedDimKeyValues.valuesIterator + } + + def makeDimensionStr(dimKeyValues: Map[String, String]): String = { + makeSortedDimension(dimKeyValues).mkString(".") + } + + def getQualifiers(intervals: Seq[IntervalUnit], ts: Long, dimKeyValues: Map[String, String]): Seq[ExactQualifier] = { + for { + tq <- TimedQualifier.getQualifiers(intervals, ts) + } yield { + ExactQualifier(tq, dimKeyValues, makeDimensionStr(dimKeyValues)) + } + } + + def strToDimensionMap(dimension: String): Map[String, String] = { + val dimSp = { + val sp = dimension.split('.') + if (dimension == ".") { + Array("", "") + } + else if (dimension.nonEmpty && dimension.last == '.') { + sp ++ Array("") + } else { + sp + } + } + val dimKey = dimSp.take(dimSp.length / 2) + val dimVal = dimSp.takeRight(dimSp.length / 2) + dimKey.zip(dimVal).toMap + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactStorage.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactStorage.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactStorage.scala new file mode 100644 index 0000000..c34e631 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactStorage.scala @@ -0,0 +1,32 @@ +package org.apache.s2graph.counter.core + +import org.apache.s2graph.counter.core.ExactCounter.ExactValueMap +import org.apache.s2graph.counter.models.Counter + +import scala.concurrent.{ExecutionContext, Future} + +trait ExactStorage { + // for range query and check dimension + def get(policy: Counter, + items: Seq[String], + timeRange: Seq[(TimedQualifier, TimedQualifier)], + dimQuery: Map[String, Set[String]]) + (implicit ec: ExecutionContext): Future[Seq[FetchedCountsGrouped]] + // for range query + def get(policy: Counter, + items: Seq[String], + timeRange: Seq[(TimedQualifier, TimedQualifier)]) + (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] + // for query exact qualifier + def get(policy: Counter, + queries: Seq[(ExactKeyTrait, Seq[ExactQualifier])]) + (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] + def update(policy: Counter, counts: Seq[(ExactKeyTrait, ExactValueMap)]): Map[ExactKeyTrait, ExactValueMap] + def delete(policy: Counter, keys: Seq[ExactKeyTrait]): Unit + def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): Seq[Boolean] + def getBlobValue(policy: Counter, blobId: String): Option[String] + + def prepare(policy: Counter): Unit + def destroy(policy: Counter): Unit + def ready(policy: Counter): Boolean +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingCounter.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingCounter.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingCounter.scala new file mode 100644 index 0000000..e3ea1b8 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingCounter.scala @@ -0,0 +1,101 @@ +package org.apache.s2graph.counter.core + +import java.util.concurrent.TimeUnit + +import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} +import com.typesafe.config.Config +import org.apache.s2graph.counter.core.RankingCounter.RankingValueMap +import org.apache.s2graph.counter.models.Counter +import org.apache.s2graph.counter.util.{CollectionCacheConfig, CollectionCache} +import org.slf4j.LoggerFactory +import scala.collection.JavaConversions._ + +case class RankingRow(key: RankingKey, value: Map[String, RankingValue]) +case class RateRankingRow(key: RankingKey, value: Map[String, RateRankingValue]) + +class RankingCounter(config: Config, storage: RankingStorage) { + private val log = LoggerFactory.getLogger(getClass) + + val storageStatusCache = new CollectionCache[Option[Boolean]](CollectionCacheConfig(1000, 60, negativeCache = false, 60)) + + val cache: LoadingCache[RankingKey, RankingResult] = CacheBuilder.newBuilder() + .maximumSize(1000000) + .expireAfterWrite(10l, TimeUnit.MINUTES) + .build( + new CacheLoader[RankingKey, RankingResult]() { + def load(rankingKey: RankingKey): RankingResult = { +// log.warn(s"cache load: $rankingKey") + storage.getTopK(rankingKey, Int.MaxValue).getOrElse(RankingResult(-1, Nil)) + } + } + ) + + def getTopK(rankingKey: RankingKey, k: Int = Int.MaxValue): Option[RankingResult] = { + val tq = rankingKey.eq.tq + if (TimedQualifier.getQualifiers(Seq(tq.q), System.currentTimeMillis()).head == tq) { + // do not use cache + storage.getTopK(rankingKey, k) + } + else { + val result = cache.get(rankingKey) + if (result.values.nonEmpty) { + Some(result.copy(values = result.values.take(k))) + } + else { + None + } + } + } + + def update(key: RankingKey, value: RankingValueMap, k: Int): Unit = { + storage.update(key, value, k) + } + + def update(values: Seq[(RankingKey, RankingValueMap)], k: Int): Unit = { + storage.update(values, k) + } + + def delete(key: RankingKey): Unit = { + storage.delete(key) + } + + def getAllItems(keys: Seq[RankingKey], k: Int = Int.MaxValue): Seq[String] = { + val oldKeys = keys.filter(key => TimedQualifier.getQualifiers(Seq(key.eq.tq.q), System.currentTimeMillis()).head != key.eq.tq) + val cached = cache.getAllPresent(oldKeys) + val missed = keys.diff(cached.keys.toSeq) + val found = storage.getTopK(missed, k) + +// log.warn(s"cached: ${cached.size()}, missed: ${missed.size}") + + for { + (key, result) <- found + } { + cache.put(key, result) + } + + for { + (key, RankingResult(totalScore, values)) <- cached ++ found + (item, score) <- values + } yield { + item + } + }.toSeq.distinct + + def prepare(policy: Counter): Unit = { + storage.prepare(policy) + } + + def destroy(policy: Counter): Unit = { + storage.destroy(policy) + } + + def ready(policy: Counter): Boolean = { + storageStatusCache.withCache(s"${policy.id}") { + Some(storage.ready(policy)) + }.getOrElse(false) + } +} + +object RankingCounter { + type RankingValueMap = Map[String, RankingValue] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingKey.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingKey.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingKey.scala new file mode 100644 index 0000000..9e3dc9a --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingKey.scala @@ -0,0 +1,4 @@ +package org.apache.s2graph.counter.core + + +case class RankingKey(policyId: Int, version: Byte, eq: ExactQualifier) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingResult.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingResult.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingResult.scala new file mode 100644 index 0000000..e738614 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingResult.scala @@ -0,0 +1,3 @@ +package org.apache.s2graph.counter.core + +case class RankingResult(totalScore: Double, values: Seq[(String, Double)]) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingStorage.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingStorage.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingStorage.scala new file mode 100644 index 0000000..34af94f --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingStorage.scala @@ -0,0 +1,17 @@ +package org.apache.s2graph.counter.core + +import org.apache.s2graph.counter.core.RankingCounter.RankingValueMap +import org.apache.s2graph.counter.models.Counter + + +trait RankingStorage { + def getTopK(key: RankingKey, k: Int): Option[RankingResult] + def getTopK(keys: Seq[RankingKey], k: Int): Seq[(RankingKey, RankingResult)] + def update(key: RankingKey, value: RankingValueMap, k: Int): Unit + def update(values: Seq[(RankingKey, RankingValueMap)], k: Int): Unit + def delete(key: RankingKey) + + def prepare(policy: Counter): Unit + def destroy(policy: Counter): Unit + def ready(policy: Counter): Boolean +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingValue.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingValue.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingValue.scala new file mode 100644 index 0000000..7691f35 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingValue.scala @@ -0,0 +1,15 @@ +package org.apache.s2graph.counter.core + +/** + * ranking score and increment value + * @param score ranking score + * @param increment increment value for v1 + */ +case class RankingValue(score: Double, increment: Double) + +object RankingValue { + def reduce(r1: RankingValue, r2: RankingValue): RankingValue = { + // maximum score and sum of increment + RankingValue(math.max(r1.score, r2.score), r1.increment + r2.increment) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RateRankingValue.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RateRankingValue.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RateRankingValue.scala new file mode 100644 index 0000000..370926f --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RateRankingValue.scala @@ -0,0 +1,15 @@ +package org.apache.s2graph.counter.core + +case class RateRankingValue(actionScore: Double, baseScore: Double) { + // increment score do not use. + lazy val rankingValue: RankingValue = { + RankingValue(actionScore / math.max(1d, baseScore), 0) + } +} + +object RateRankingValue { + def reduce(r1: RateRankingValue, r2: RateRankingValue): RateRankingValue = { + // maximum score and sum of increment + RateRankingValue(math.max(r1.actionScore, r2.actionScore), math.max(r1.baseScore, r2.baseScore)) + } +}
