http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala index d60641f..1c58086 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala @@ -20,6 +20,7 @@ package org.apache.s2graph.core.types import org.apache.hadoop.hbase.util._ +import org.apache.s2graph.core.utils.logger object InnerVal extends HBaseDeserializableWithIsVertexId { import HBaseType._ @@ -39,7 +40,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId { val INT = "integer" val SHORT = "short" val BYTE = "byte" - val NUMERICS = List(DOUBLE, FLOAT, LONG, INT, SHORT, BYTE) + val NUMERICS = Set(DOUBLE, FLOAT, LONG, INT, SHORT, BYTE) val BOOLEAN = "boolean" def isNumericType(dataType: String): Boolean = { @@ -218,15 +219,11 @@ trait InnerValLike extends HBaseSerializable { override def hashCode(): Int = value.hashCode() - override def equals(obj: Any): Boolean = { - obj match { - case other: InnerValLike => - val ret = toString == obj.toString -// logger.debug(s"InnerValLike.equals($this, $obj) => $ret") - ret - case _ => false - } + override def equals(obj: Any): Boolean = obj match { + case other: InnerValLike => value == other.value + case _ => false } + def hashKey(dataType: String): Int def toIdString(): String
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala index 96f87ed..9bb99ed 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala @@ -117,7 +117,7 @@ class DeferCache[A, M[_], C[_]](config: Config, empty: => A, name: String = "", DeferCache.addScheduleJob(delay = metricInterval) { logger.metric(s"${name}: ${cache.stats()}") } cache } else { - builder.build[java.lang.Long, (Long, M[A])]() + builder.recordStats().build[java.lang.Long, (Long, M[A])]() } } @@ -194,4 +194,7 @@ class DeferCache[A, M[_], C[_]](config: Config, empty: => A, name: String = "", checkAndExpire(cacheKey, cacheTTL, cachedAt, canDefer.future(cachedPromise))(op) } } + + def stats = futureCache.stats() + def size = futureCache.size() } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/utils/Extentions.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Extentions.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Extentions.scala index 8ffd1dc..da5f635 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/utils/Extentions.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Extentions.scala @@ -26,7 +26,6 @@ import scala.concurrent.{ExecutionContext, Future, Promise} object Extensions { - def retryOnSuccess[T](maxRetryNum: Int, n: Int = 1)(fn: => Future[T])(shouldStop: T => Boolean)(implicit ex: ExecutionContext): Future[T] = n match { case i if n <= maxRetryNum => fn.flatMap { result => @@ -52,37 +51,67 @@ object Extensions { implicit class DeferOps[T](d: Deferred[T])(implicit ex: ExecutionContext) { + def map[R](dummy: => T)(op: T => R): Deferred[R] = { + val newDefer = new Deferred[R] - def withCallback[R](op: T => R): Deferred[R] = { - d.addCallback(new Callback[R, T] { - override def call(arg: T): R = op(arg) + d.addCallback(new Callback[T, T] { + override def call(arg: T): T = { + newDefer.callback(op(arg)) + arg + } }) - } - def recoverWith(op: Exception => T): Deferred[T] = { - d.addErrback(new Callback[Deferred[T], Exception] { - override def call(e: Exception): Deferred[T] = Deferred.fromResult(op(e)) + d.addErrback(new Callback[T, Exception] { + override def call(e: Exception): T = { + newDefer.callback(e) + dummy + } }) + + newDefer } + def mapWithFallback[R](dummy: => T)(fallback: Exception => R)(op: T => R): Deferred[R] = { + val newDefer = new Deferred[R] - def toFuture: Future[T] = { - val promise = Promise[T] + d.addCallback(new Callback[T, T] { + override def call(arg: T): T = { + newDefer.callback(op(arg)) + arg + } + }) - d.addBoth(new Callback[Unit, T] { - def call(arg: T) = arg match { - case e: Exception => promise.failure(e) - case _ => promise.success(arg) + d.addErrback(new Callback[T, Exception] { + override def call(e: Exception): T = { + newDefer.callback(fallback(e)) + dummy } }) - promise.future + newDefer } - def toFutureWith(fallback: => T): Future[T] = { - toFuture recoverWith { case t: Throwable => Future.successful(fallback) } - } + def toFuture(dummy: => T): Future[T] = { + val promise = Promise[T] + + val cb = new Callback[T, T] { + override def call(arg: T): T = { + promise.success(arg) + arg + } + } + + val eb = new Callback[T, Exception] { + override def call(e: Exception): T = { + promise.failure(e) + dummy + } + } + + d.addCallbacks(cb, eb) + promise.future + } } implicit class ConfigOps(config: Config) { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala index 0bc4554..13eb1a3 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala @@ -20,11 +20,10 @@ package org.apache.s2graph.core.utils import java.util.concurrent.atomic.AtomicBoolean - import com.google.common.cache.CacheBuilder - import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} +import scala.collection.JavaConversions._ object SafeUpdateCache { @@ -77,5 +76,11 @@ class SafeUpdateCache[T](prefix: String, maxSize: Int, ttl: Int)(implicit execut } } } + + def getAllData() : List[(String, T)] = { + cache.asMap().map { case (key, value) => + (key.key.substring(prefix.size + 1), value._1) + }.toList + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala index 6933320..6321fef 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala @@ -29,7 +29,11 @@ import play.api.libs.json.{JsObject, Json} class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { initTests() + val testLabelMeta1 = LabelMeta(Option(-1), labelV2.id.get, "test", 1.toByte, "true", "boolean") + val testLabelMeta3 = LabelMeta(Option(-1), labelV2.id.get, "test", 3.toByte, "-1", "long") + test("toLogString") { + val testServiceName = serviceNameV2 val testLabelName = labelNameV2 val bulkQueries = List( ("1445240543366", "update", "{\"is_blocked\":true}"), @@ -45,12 +49,14 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { Edge.toEdge(srcId, tgtId, labelName, "out", properties, ts.toLong, op).toLogString }).mkString("\n") + val attachedProps = "\"from\":\"1\",\"to\":\"2\",\"label\":\"" + testLabelName + + "\",\"service\":\"" + testServiceName + "\"" val expected = Seq( - Seq("1445240543366", "update", "e", "1", "2", testLabelName, "{\"is_blocked\":true}"), - Seq("1445240543362", "insert", "e", "1", "2", testLabelName, "{\"is_hidden\":false}"), - Seq("1445240543364", "insert", "e", "1", "2", testLabelName, "{\"is_hidden\":false,\"weight\":10}"), - Seq("1445240543363", "delete", "e", "1", "2", testLabelName), - Seq("1445240543365", "update", "e", "1", "2", testLabelName, "{\"time\":1,\"weight\":-10}") + Seq("1445240543366", "update", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"is_blocked\":true}"), + Seq("1445240543362", "insert", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"is_hidden\":false}"), + Seq("1445240543364", "insert", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"is_hidden\":false,\"weight\":10}"), + Seq("1445240543363", "delete", "e", "1", "2", testLabelName, "{" + attachedProps + "}"), + Seq("1445240543365", "update", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"time\":1,\"weight\":-10}") ).map(_.mkString("\t")).mkString("\n") assert(bulkEdge === expected) @@ -62,16 +68,16 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { val srcVertex = Vertex(vertexId) val tgtVertex = srcVertex - val timestampProp = LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) + val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) val snapshotEdge = None val propsWithTs = Map(timestampProp) - val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = propsWithTs) + val requestEdge = Edge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) val newVersion = 0L val newPropsWithTs = Map( timestampProp, - 1.toByte -> InnerValLikeWithTs(InnerVal.withBoolean(false, schemaVersion), 1) + testLabelMeta1 -> InnerValLikeWithTs(InnerVal.withBoolean(false, schemaVersion), 1) ) val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, propsWithTs, newPropsWithTs) @@ -88,16 +94,16 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { val srcVertex = Vertex(vertexId) val tgtVertex = srcVertex - val timestampProp = LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) + val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) val snapshotEdge = None val propsWithTs = Map(timestampProp) - val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = propsWithTs) + val requestEdge = Edge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) val newVersion = 0L val newPropsWithTs = Map( timestampProp, - 1.toByte -> InnerValLikeWithTs(InnerVal.withBoolean(false, schemaVersion), 1) + testLabelMeta1 -> InnerValLikeWithTs(InnerVal.withBoolean(false, schemaVersion), 1) ) val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, propsWithTs, newPropsWithTs) @@ -114,11 +120,11 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { val srcVertex = Vertex(vertexId) val tgtVertex = srcVertex - val timestampProp = LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) + val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) val snapshotEdge = None val propsWithTs = Map(timestampProp) - val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = propsWithTs) + val requestEdge = Edge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) val newVersion = 0L val newPropsWithTs = propsWithTs @@ -137,7 +143,7 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { val srcVertex = Vertex(vertexId) val tgtVertex = srcVertex - val timestampProp = LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) + val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) val oldPropsWithTs = Map( timestampProp, LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) @@ -145,14 +151,14 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { val propsWithTs = Map( timestampProp, - 3.toByte -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 2), + testLabelMeta3 -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 2), LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) ) val snapshotEdge = - Option(Edge(srcVertex, tgtVertex, labelWithDirV2, op = GraphUtil.operations("delete"), propsWithTs = oldPropsWithTs)) + Option(Edge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = oldPropsWithTs)) - val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = propsWithTs) + val requestEdge = Edge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) val newVersion = 0L val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) logger.info(edgeMutate.toLogString) @@ -168,7 +174,7 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { val srcVertex = Vertex(vertexId) val tgtVertex = srcVertex - val timestampProp = LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) + val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) val oldPropsWithTs = Map( timestampProp, LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) @@ -176,14 +182,14 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { val propsWithTs = Map( timestampProp, - 3.toByte -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 4), + testLabelMeta3 -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 4), LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) ) val snapshotEdge = - Option(Edge(srcVertex, tgtVertex, labelWithDirV2, op = GraphUtil.operations("delete"), propsWithTs = oldPropsWithTs)) + Option(Edge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = oldPropsWithTs)) - val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = propsWithTs) + val requestEdge = Edge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) val newVersion = 0L val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) logger.info(edgeMutate.toLogString) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala index 6054d67..a41152c 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala @@ -199,7 +199,7 @@ class CrudTest extends IntegrateCommon { object CrudHelper { class CrudTestRunner { - var seed = 0 + var seed = System.currentTimeMillis() def run(tcNum: Int, tcString: String, opWithProps: List[(Long, String, String)], expected: Map[String, String]) = { for { @@ -271,7 +271,7 @@ class CrudTest extends IntegrateCommon { val queryJson = querySnapshotEdgeJson(serviceName, columnName, labelName, id) if (!rets.forall(identity)) { - Thread.sleep(graph.storage.LockExpireDuration + 100) + Thread.sleep(graph.LockExpireDuration + 100) /** expect current request would be ignored */ val bulkEdges = Seq(TestUtil.toEdge(i-1, "u", "e", 0, 0, testLabelName, Json.obj("time" -> 20).toString())) val rets = TestUtil.insertEdgesSync(bulkEdges: _*) @@ -296,7 +296,7 @@ class CrudTest extends IntegrateCommon { val queryJson = querySnapshotEdgeJson(serviceName, columnName, labelName, id) if (!rets.forall(identity)) { - Thread.sleep(graph.storage.LockExpireDuration + 100) + Thread.sleep(graph.LockExpireDuration + 100) /** expect current request would be applied */ val bulkEdges = Seq(TestUtil.toEdge(i+1, "u", "e", 0, 0, testLabelName, Json.obj("time" -> 20).toString())) val rets = TestUtil.insertEdgesSync(bulkEdges: _*) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala index b341ec5..c84ad6e 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala @@ -23,7 +23,7 @@ import com.typesafe.config._ import org.apache.s2graph.core.mysqls.Label import org.apache.s2graph.core.rest.{RequestParser, RestHandler} import org.apache.s2graph.core.utils.logger -import org.apache.s2graph.core.{Graph, GraphUtil, Management, PostProcess} +import org.apache.s2graph.core._ import org.scalatest._ import play.api.libs.json.{JsValue, Json} @@ -90,7 +90,7 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { } } - val vertexPropsKeys = List("age" -> "int") + val vertexPropsKeys = List("age" -> "int", "im" -> "string") vertexPropsKeys.map { case (key, keyType) => Management.addVertexProp(testServiceName, testColumnName, key, keyType) @@ -129,10 +129,19 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { Await.result(future, HttpRequestWaitingTime) } + def getEdgesSync(s2Query: Query): JsValue = { + logger.info(s2Query.toString) + val stepResult = Await.result(graph.getEdges(s2Query), HttpRequestWaitingTime) + val result = PostProcess.toJson(Option(s2Query.jsonQuery))(graph, s2Query.queryOption, stepResult) +// val result = Await.result(graph.getEdges(s2Query).(PostProcess.toJson), HttpRequestWaitingTime) + logger.debug(s"${Json.prettyPrint(result)}") + result + } + def getEdgesSync(queryJson: JsValue): JsValue = { logger.info(Json.prettyPrint(queryJson)) val restHandler = new RestHandler(graph) - val result = Await.result(restHandler.getEdgesAsync(queryJson)(PostProcess.toJson), HttpRequestWaitingTime) + val result = Await.result(restHandler.getEdgesAsync(queryJson)(PostProcess.toJson(Option(queryJson))), HttpRequestWaitingTime) logger.debug(s"${Json.prettyPrint(result)}") result } @@ -301,7 +310,10 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { "tgtServiceName": "$testServiceName", "tgtColumnName": "$testTgtColumnName", "tgtColumnType": "string", - "indices": [{"name": "$index1", "propNames": ["time", "weight", "is_hidden", "is_blocked"]}], + "indices": [ + {"name": "$index1", "propNames": ["time", "weight", "is_hidden", "is_blocked"]}, + {"name": "$index2", "propNames": ["time"]} + ], "props": [ { "name": "time", http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala index 54bb12c..34f4d2c 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala @@ -19,9 +19,11 @@ package org.apache.s2graph.core.Integrate +import org.apache.s2graph.core.parsers.Where import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core._ import org.scalatest.BeforeAndAfterEach -import play.api.libs.json.{JsNumber, JsValue, Json} +import play.api.libs.json._ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { @@ -32,27 +34,178 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { val weight = "weight" val is_hidden = "is_hidden" - test("interval") { - def queryWithInterval(id: Int, index: String, prop: String, fromVal: Int, toVal: Int) = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "$testServiceName", - "columnName": "$testColumnName", - "id": $id - }], - "steps": [ - [ { - "label": "$testLabelName", - "index": "$index", - "interval": { - "from": [ { "$prop": $fromVal } ], - "to": [ { "$prop": $toVal } ] + def querySingle(id: Int, offset: Int = 0, limit: Int = 100) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "$testServiceName", + "columnName": "$testColumnName", + "id": $id + }], + "steps": [ + [ { + "label": "$testLabelName", + "direction": "out", + "offset": $offset, + "limit": $limit } - } - ]] - } - """) + ]] + } + """) + + def queryGlobalLimit(id: Int, limit: Int): JsValue = Json.obj( + "limit" -> limit, + "srcVertices" -> Json.arr( + Json.obj("serviceName" -> testServiceName, "columnName" -> testColumnName, "id" -> id) + ), + "steps" -> Json.arr( + Json.obj( + "step" -> Json.arr( + Json.obj( + "label" -> testLabelName + ) + ) + ) + ) + ) + + def getQuery(id: Int, where: String): Query = + Query( + vertices = Seq(Vertex.toVertex(testServiceName, testColumnName, id)), + steps = Vector( + Step(Seq(QueryParam(testLabelName, where = Where(testLabelName, where)))) + ) + ) + + def queryIntervalWithParent(id: Int, index: String, prop: String, value: String) = + Query( + vertices = Seq(Vertex.toVertex(testServiceName, testColumnName, id)), + steps = Vector( + Step(Seq(QueryParam(testLabelName, indexName = index))), + Step(Seq(QueryParam(testLabelName, indexName = index, + intervalOpt = Option(Seq(prop -> JsString(value)), Seq(prop -> JsString(value))))) + ) + ) + ) + + def queryIntervalWithParentRange(id: Int, index: String, + prop: String, value: String, + toProp: String, toValue: String) = + Query( + vertices = Seq(Vertex.toVertex(testServiceName, testColumnName, id)), + steps = Vector( + Step(Seq(QueryParam(testLabelName, indexName = index))), + Step(Seq(QueryParam(testLabelName, indexName = index, + intervalOpt = Option(Seq(prop -> JsString(value)), Seq(toProp -> JsString(toValue))))) + ) + ) + ) + + def queryWithInterval(id: Int, index: String, prop: String, fromVal: Int, toVal: Int) = + Query( + vertices = Seq(Vertex.toVertex(testServiceName, testColumnName, id)), + steps = Vector( + Step(Seq(QueryParam(testLabelName, indexName = index, + intervalOpt = Option(Seq(prop -> JsNumber(fromVal)), Seq(prop -> JsNumber(toVal)))))) + ) + ) + + def queryExclude(id: Int) = + Query( + vertices = Seq(Vertex.toVertex(testServiceName, testColumnName, id)), + steps = Vector( + Step( + Seq( + QueryParam(testLabelName, limit = 2), + QueryParam(testLabelName, direction = "in", limit = 2, exclude = true) + ) + ) + ) + ) + + def queryGroupBy(id: Int, props: Seq[String]) = + Query( + vertices = Seq(Vertex.toVertex(testServiceName, testColumnName, id)), + steps = Vector( + Step( + Seq(QueryParam(testLabelName)) + ) + ), + queryOption = QueryOption(groupBy = GroupBy(props, 100)) + ) + + test("query with defaultValue") { + // ref: edges from initTestData() + + // no default value + var edges = getEdgesSync(getQuery(0, "_to = 1")) + (edges \\ "is_hidden").head.as[Boolean] should be(true) + + // default value(weight, is_hidden) + edges = getEdgesSync(getQuery(-1, "_to = 1000")) + (edges \\ "is_hidden").head.as[Boolean] should be(false) + (edges \\ "weight").head.as[Long] should be(0) + + // default value(is_hidden) + edges = getEdgesSync(getQuery(10, "_to = 20")) + (edges \\ "is_hidden").head.as[Boolean] should be(false) + } + + test("degree with `Where clause") { + val edges = getEdgesSync(getQuery(2, "_from != 2")) + (edges \ "degrees").as[Seq[JsValue]].nonEmpty should be(true) + } + + test("interval parent") { + val baseId = 1024 + insertEdgesSync( + toEdge(20, insert, e, baseId, baseId + 1, testLabelName, Json.obj(weight -> 30, is_hidden -> true)), + + toEdge(10, insert, e, baseId + 1, baseId + 10, testLabelName, Json.obj(weight -> 30, is_hidden -> true)), + toEdge(20, insert, e, baseId + 1, baseId + 20, testLabelName, Json.obj(weight -> 30, is_hidden -> true)), + toEdge(30, insert, e, baseId + 1, baseId + 30, testLabelName, Json.obj(weight -> 30, is_hidden -> true)) + ) + + val edges = getEdgesSync(queryIntervalWithParent(baseId, index2, "_timestamp", "_parent._timestamp")) + (edges \ "size").get.toString should be("1") + + val to = (edges \\ "to").head.as[Long] + to should be (baseId + 20) + } + + test("interval parent with range") { + val baseId = 9876 + + val minute: Long = 60 * 1000L + val hour = 60 * minute + + insertEdgesSync( + toEdge(1, insert, e, baseId, baseId + 1, testLabelName, Json.obj(weight -> 30, is_hidden -> true)), + toEdge(1 + hour * 2, insert, e, baseId + 1, baseId + 10, testLabelName, Json.obj(weight -> 30, is_hidden -> true)), + toEdge(1 + hour * 3, insert, e, baseId + 1, baseId + 20, testLabelName, Json.obj(weight -> 30, is_hidden -> true)), + toEdge(1 + hour * 4, insert, e, baseId + 1, baseId + 30, testLabelName, Json.obj(weight -> 30, is_hidden -> true)) + ) + + val edges = getEdgesSync(queryIntervalWithParentRange(baseId, index2, + "_timestamp", "${_parent._timestamp}", + "_timestamp", "${_parent._timestamp + 3 hour}")) + + (edges \ "size").get.toString should be("2") + + val edges2 = getEdgesSync(queryIntervalWithParentRange(baseId, index2, + "_timestamp", "${_parent._timestamp}", + "_timestamp", "${_parent._timestamp + 2 hour}")) + + (edges2 \ "size").get.toString should be("1") + + val edges3 = getEdgesSync(queryIntervalWithParentRange(baseId, index2, + "_timestamp", "${_parent._timestamp + 130 minute}", + "_timestamp", "${_parent._timestamp + 4 hour}")) + + (edges3 \ "size").get.toString should be("2") + } + + test("interval") { var edges = getEdgesSync(queryWithInterval(0, index2, "_timestamp", 1000, 1001)) // test interval on timestamp index (edges \ "size").get.toString should be("1") @@ -67,88 +220,29 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { } test("get edge with where condition") { - def queryWhere(id: Int, where: String) = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "${testServiceName}", - "columnName": "${testColumnName}", - "id": ${id} - }], - "steps": [ - [ { - "label": "${testLabelName}", - "direction": "out", - "offset": 0, - "limit": 100, - "where": "${where}" - } - ]] - }""") - var result = getEdgesSync(queryWhere(0, "is_hidden=false and _from in (-1, 0)")) + var result = getEdgesSync(getQuery(0, "is_hidden=false and _from in (-1, 0)")) (result \ "results").as[List[JsValue]].size should be(1) - result = getEdgesSync(queryWhere(0, "is_hidden=true and _to in (1)")) + result = getEdgesSync(getQuery(0, "is_hidden=true and _to in (1)")) (result \ "results").as[List[JsValue]].size should be(1) - result = getEdgesSync(queryWhere(0, "_from=0")) + result = getEdgesSync(getQuery(0, "_from=0")) (result \ "results").as[List[JsValue]].size should be(2) - result = getEdgesSync(queryWhere(2, "_from=2 or weight in (-1)")) + result = getEdgesSync(getQuery(2, "_from=2 or weight in (-1)")) (result \ "results").as[List[JsValue]].size should be(2) - result = getEdgesSync(queryWhere(2, "_from=2 and weight in (10, 20)")) + result = getEdgesSync(getQuery(2, "_from=2 and weight in (10, 20)")) (result \ "results").as[List[JsValue]].size should be(2) } test("get edge exclude") { - def queryExclude(id: Int) = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "${testServiceName}", - "columnName": "${testColumnName}", - "id": ${id} - }], - "steps": [ - [ { - "label": "${testLabelName}", - "direction": "out", - "offset": 0, - "limit": 2 - }, - { - "label": "${testLabelName}", - "direction": "in", - "offset": 0, - "limit": 2, - "exclude": true - } - ]] - }""") - val result = getEdgesSync(queryExclude(0)) (result \ "results").as[List[JsValue]].size should be(1) } test("get edge groupBy property") { - def queryGroupBy(id: Int, props: Seq[String]): JsValue = { - Json.obj( - "groupBy" -> props, - "srcVertices" -> Json.arr( - Json.obj("serviceName" -> testServiceName, "columnName" -> testColumnName, "id" -> id) - ), - "steps" -> Json.arr( - Json.obj( - "step" -> Json.arr( - Json.obj( - "label" -> testLabelName - ) - ) - ) - ) - ) - } - val result = getEdgesSync(queryGroupBy(0, Seq("weight"))) (result \ "size").as[Int] should be(2) val weights = (result \ "results" \\ "groupBy").map { js => @@ -235,8 +329,6 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { // } // } - - test("duration") { def queryDuration(ids: Seq[Int], from: Int, to: Int) = { val $from = Json.arr( @@ -285,11 +377,36 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { } - test("return tree") { + def queryParentsWithoutSelect(id: Long) = Json.parse( + s""" + { + "returnTree": true, + "srcVertices": [ + { "serviceName": "$testServiceName", + "columnName": "$testColumnName", + "id": $id + }], + "steps": [ + [ { + "label": "$testLabelName", + "direction": "out", + "offset": 0, + "limit": 2 + } + ],[{ + "label": "$testLabelName", + "direction": "in", + "offset": 0, + "limit": 1000 + } + ]] + }""".stripMargin) + def queryParents(id: Long) = Json.parse( s""" { + "select": ["weight"], "returnTree": true, "srcVertices": [ { "serviceName": "$testServiceName", @@ -317,15 +434,45 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { insertEdgesSync(toEdge(1001, "insert", "e", src, tgt, testLabelName)) - val result = TestUtil.getEdgesSync(queryParents(src)) - val parents = (result \ "results").as[Seq[JsValue]] - val ret = parents.forall { - edge => (edge \ "parents").as[Seq[JsValue]].size == 1 + // test parent With select fields + var result = TestUtil.getEdgesSync(queryParents(src)) + var parents = (result \ "results").as[Seq[JsValue]] + var ret = parents.forall { edge => + val parentEdges = (edge \ "parents").as[Seq[JsValue]] + val assertSize = parentEdges.size == 1 + val parentProps = (parentEdges.head \ "props").as[JsObject] + val parentWeight = (parentProps \ "weight").as[Long] + val parentIsHidden = (parentProps \ "is_hidden").asOpt[Boolean] + + val assertProp = parentWeight == 0 && parentIsHidden.isEmpty // select only "weight" + + assertSize && assertProp } ret should be(true) - } + // test parent With select fields: check default Prop + result = TestUtil.getEdgesSync(queryParentsWithoutSelect(src)) + parents = (result \ "results").as[Seq[JsValue]] + ret = parents.forall { edge => + val parentEdges = (edge \ "parents").as[Seq[JsValue]] + val assertSize = parentEdges.size == 1 + + val parentProps = (parentEdges.head \ "props").as[JsObject] + + val parentWeight = (parentProps \ "weight").as[Int] + val parentIsHidden = (parentProps \ "is_hidden").as[Boolean] + val parentIsBlocked = (parentProps \ "is_blocked").as[Boolean] + val parentTime = (parentProps \ "time").as[Long] + + val assertProp = + parentWeight == 0 && parentIsHidden == false && parentIsBlocked == false && parentTime == 0 + + assertSize && assertProp + } + + ret should be(true) + } test("pagination and _to") { @@ -438,7 +585,6 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { edgesTo.reverse should be(ascOrderByTo) } - test("query with sampling") { def queryWithSampling(id: Int, sample: Int) = Json.parse( s""" @@ -757,7 +903,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { | { | "step": [ | { - | "label": "$testLabelName2", + | "label": "$testLabelName", | "direction": "out", | "offset": 0, | "limit": 5 @@ -778,9 +924,9 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { toEdge(1, insert, e, testId1, 111, testLabelName, Json.obj(weight -> 10)), toEdge(2, insert, e, testId1, 222, testLabelName, Json.obj(weight -> 10)), toEdge(3, insert, e, testId1, 333, testLabelName, Json.obj(weight -> 10)), - toEdge(4, insert, e, testId2, 111, testLabelName2, Json.obj(weight -> 1)), - toEdge(5, insert, e, testId2, 333, testLabelName2, Json.obj(weight -> 1)), - toEdge(6, insert, e, testId2, 555, testLabelName2, Json.obj(weight -> 1)) + toEdge(4, insert, e, testId2, 111, testLabelName, Json.obj(weight -> 1)), + toEdge(5, insert, e, testId2, 333, testLabelName, Json.obj(weight -> 1)), + toEdge(6, insert, e, testId2, 555, testLabelName, Json.obj(weight -> 1)) ) insertEdgesSync(bulkEdges: _*) @@ -859,196 +1005,6 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { results.size should be(4) } - test("scorePropagateOp test") { - def queryWithPropertyOp(id: String, op: String, shrinkageVal: Long) = Json.parse( - s"""{ - | "limit": 10, - | "groupBy": ["from"], - | "duplicate": "sum", - | "srcVertices": [ - | { - | "serviceName": "$testServiceName", - | "columnName": "$testColumnName", - | "id": $id - | } - | ], - | "steps": [ - | { - | "step": [ - | { - | "label": "$testLabelName", - | "direction": "out", - | "offset": 0, - | "limit": 10, - | "groupBy": ["from"], - | "duplicate": "sum", - | "index": "idx_1", - | "scoring": { - | "weight":1, - | "time": 0 - | }, - | "transform": [["_from"]] - | } - | ] - | }, { - | "step": [ - | { - | "label": "$testLabelName2", - | "direction": "out", - | "offset": 0, - | "limit": 10, - | "scorePropagateOp": "$op", - | "scorePropagateShrinkage": $shrinkageVal - | } - | ] - | } - | ] - |} - """.stripMargin - ) - - def queryWithOp(ids: Seq[String], op: String, shrinkageVal: Long) = Json.parse( - s"""{ - | "limit": 10, - | "groupBy": ["from"], - | "duplicate": "sum", - | "srcVertices": [ - | { - | "serviceName": "$testServiceName", - | "columnName": "$testColumnName", - | "ids": [${ids.mkString(",")}] - | } - | ], - | "steps": [ - | { - | "step": [ - | { - | "label": "$testLabelName", - | "direction": "out", - | "offset": 0, - | "limit": 10, - | "groupBy": ["from"], - | "duplicate": "countSum", - | "transform": [["_from"]] - | } - | ] - | }, { - | "step": [ - | { - | "label": "$testLabelName2", - | "direction": "out", - | "offset": 0, - | "limit": 10, - | "scorePropagateOp": "$op", - | "scorePropagateShrinkage": $shrinkageVal - | } - | ] - | } - | ] - |} - """.stripMargin - ) - - val testId = "-30000" - val testId2 = "-4000" - - val bulkEdges = Seq( - toEdge(1, insert, e, testId, 101, testLabelName, Json.obj(weight -> -10)), - toEdge(1, insert, e, testId, 102, testLabelName, Json.obj(weight -> -10)), - toEdge(1, insert, e, testId, 103, testLabelName, Json.obj(weight -> -10)), - toEdge(1, insert, e, testId, 102, testLabelName2, Json.obj(weight -> 10)), - toEdge(1, insert, e, testId, 103, testLabelName2, Json.obj(weight -> 10)), - toEdge(1, insert, e, testId, 104, testLabelName2, Json.obj(weight -> 10)), - toEdge(1, insert, e, testId, 105, testLabelName2, Json.obj(weight -> 10)), - - toEdge(1, insert, e, testId2, 101, testLabelName, Json.obj(weight -> -10)), - toEdge(1, insert, e, testId2, 102, testLabelName, Json.obj(weight -> -10)), - toEdge(1, insert, e, testId2, 103, testLabelName, Json.obj(weight -> -10)), - toEdge(1, insert, e, testId2, 102, testLabelName2, Json.obj(weight -> 10)), - toEdge(1, insert, e, testId2, 105, testLabelName2, Json.obj(weight -> 10)) - ) - insertEdgesSync(bulkEdges: _*) - - val firstStepEdgeCount = 3l - val secondStepEdgeCount = 4l - - var shrinkageVal = 10l - var rs = getEdgesSync(queryWithOp(Seq(testId), "divide", shrinkageVal)) - - var results = (rs \ "results").as[List[JsValue]] - results.size should be(1) - var scoreSum = secondStepEdgeCount.toDouble / (firstStepEdgeCount.toDouble + shrinkageVal) - (results(0) \ "scoreSum").as[Double] should be(scoreSum) - - rs = getEdgesSync(queryWithOp(Seq(testId, testId2), "divide", shrinkageVal)) - - results = (rs \ "results").as[List[JsValue]] - results.size should be(2) - scoreSum = secondStepEdgeCount.toDouble / (firstStepEdgeCount.toDouble + shrinkageVal) - (results(0) \ "scoreSum").as[Double] should be(scoreSum) - scoreSum = 2.toDouble / (3.toDouble + shrinkageVal) - (results(1) \ "scoreSum").as[Double] should be(scoreSum) - - // check for divide zero case - shrinkageVal = 30l - rs = getEdgesSync(queryWithPropertyOp(testId, "divide", shrinkageVal)) - - results = (rs \ "results").as[List[JsValue]] - results.size should be(1) - (results(0) \ "scoreSum").as[Double] should be(0) - - // "plus" operation - rs = getEdgesSync(queryWithOp(Seq(testId), "plus", shrinkageVal)) - - results = (rs \ "results").as[List[JsValue]] - results.size should be(1) - scoreSum = (firstStepEdgeCount + 1) * secondStepEdgeCount - (results(0) \ "scoreSum").as[Long] should be(scoreSum) - - // "multiply" operation - rs = getEdgesSync(queryWithOp(Seq(testId), "multiply", shrinkageVal)) - logger.debug(Json.prettyPrint(rs)) - results = (rs \ "results").as[List[JsValue]] - results.size should be(1) - scoreSum = (firstStepEdgeCount * 1) * secondStepEdgeCount - (results(0) \ "scoreSum").as[Long] should be(scoreSum) - } - - def querySingle(id: Int, offset: Int = 0, limit: Int = 100) = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "$testServiceName", - "columnName": "$testColumnName", - "id": $id - }], - "steps": [ - [ { - "label": "$testLabelName", - "direction": "out", - "offset": $offset, - "limit": $limit - } - ]] - } - """) - - def queryGlobalLimit(id: Int, limit: Int): JsValue = Json.obj( - "limit" -> limit, - "srcVertices" -> Json.arr( - Json.obj("serviceName" -> testServiceName, "columnName" -> testColumnName, "id" -> id) - ), - "steps" -> Json.arr( - Json.obj( - "step" -> Json.arr( - Json.obj( - "label" -> testLabelName - ) - ) - ) - ) - ) - - // called by each test, each override def beforeEach = initTestData() http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala index d62dee8..dc5dc2e 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala @@ -22,7 +22,7 @@ package org.apache.s2graph.core.Integrate import java.util.concurrent.TimeUnit import org.scalatest.BeforeAndAfterEach -import play.api.libs.json.{JsObject, JsValue, Json} +import play.api.libs.json._ import scala.concurrent.Await import scala.concurrent.duration.Duration @@ -130,7 +130,9 @@ class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach { toEdge(startTs + 5, "insert", "e", "10", "20", testLabelNameWeak, s"""{"time": 10}"""), toEdge(startTs + 6, "insert", "e", "10", "21", testLabelNameWeak, s"""{"time": 11}"""), toEdge(startTs + 7, "insert", "e", "11", "20", testLabelNameWeak, s"""{"time": 12}"""), - toEdge(startTs + 8, "insert", "e", "12", "20", testLabelNameWeak, s"""{"time": 13}""") + toEdge(startTs + 8, "insert", "e", "12", "20", testLabelNameWeak, s"""{"time": 13}"""), + toEdge(startTs + 9, "insert", "e", "10000", "20000", testLabelNameWeak, + s"""{"time": 1, "weight": 0.1, "is_hidden": true, "is_blocked": false}""") ) def query(id: Int, direction: String = "out", columnName: String = testColumnName) = Json.parse( http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/test/scala/org/apache/s2graph/core/ManagementTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/ManagementTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/ManagementTest.scala index e21f0e7..afc9aea 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/ManagementTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/ManagementTest.scala @@ -20,9 +20,73 @@ package org.apache.s2graph.core import org.apache.s2graph.core.Integrate.IntegrateCommon -import org.apache.s2graph.core.mysqls.Label +import org.apache.s2graph.core.mysqls.{Model, Label, Service} + +import scala.util.{Failure, Success} +import play.api.libs.json.{JsValue, Json} class ManagementTest extends IntegrateCommon { + + + def checkCopyLabel(originalLabelName: String, newLabelName: String) = { + val originalLabelOpt = Label.findByName(originalLabelName, useCache = true) + originalLabelOpt.isDefined should be(true) + val originalLabel = originalLabelOpt.get + + val labelTry = management.copyLabel(originalLabelName, newLabelName, hTableName = Option(newLabelName)) + labelTry.isSuccess should be(true) + val copiedLabel = labelTry.get + copiedLabel.label should be(newLabelName) + copiedLabel.id.get != originalLabel.id.get should be(true) + copiedLabel.hTableTTL should equal(originalLabel.hTableTTL) + + val copiedLabelMetaMap = copiedLabel.metas(useCache = false).map(m => m.seq -> m.name).toMap + val copiedLabelIndiceMap = copiedLabel.indices(useCache = false).map(m => m.seq -> m.metaSeqs).toMap + val originalLabelMetaMap = originalLabel.metas(useCache = false).map(m => m.seq -> m.name).toMap + val originalLabelIndiceMap = originalLabel.indices(useCache = false).map(m => m.seq -> m.metaSeqs).toMap + + copiedLabelMetaMap should be(originalLabelMetaMap) + copiedLabelIndiceMap should be(originalLabelIndiceMap) + + copiedLabel.metas().sortBy(m => m.id.get).map(m => m.name) should be(originalLabel.metas().sortBy(m => m.id.get).map(m => m.name)) + copiedLabel.indices().sortBy(m => m.id.get).map(m => m.metaSeqs) should be(originalLabel.indices().sortBy(m => m.id.get).map(m => m.metaSeqs)) + } + + def checkLabelTTL(labelName:String, serviceName:String, setTTL:Option[Int], checkTTL:Option[Int]) = { + Management.deleteLabel(labelName) + val ttlOption = if(setTTL.isDefined) s""", "hTableTTL": ${setTTL.get}""" else "" + val createLabelJson = s"""{ + "label": "$labelName", + "srcServiceName": "$serviceName", + "srcColumnName": "id", + "srcColumnType": "long", + "tgtServiceName": "$serviceName", + "tgtColumnName": "id", + "tgtColumnType": "long", + "indices":[], + "props":[], + "hTableName": "$labelName" + $ttlOption + }""" + val labelOpts = parser.toLabelElements(Json.parse(createLabelJson)) + val tryLabel = (management.createLabel _).tupled(labelOpts.get) + assert(tryLabel.isSuccess) + val label = tryLabel.get + label.hTableTTL should be(checkTTL) + } + + test("copy label test") { + val labelToCopy = s"${TestUtil.testLabelName}_copied" + Label.findByName(labelToCopy) match { + case None => + // + case Some(oldLabel) => + Label.delete(oldLabel.id.get) + + } + checkCopyLabel(TestUtil.testLabelName, labelToCopy) + } + test("swap label test") { val labelLeft = TestUtil.testLabelName val labelRight = TestUtil.testLabelName2 @@ -33,4 +97,53 @@ class ManagementTest extends IntegrateCommon { Label.findByName(labelLeft, false).get.schemaVersion should be("v4") Label.findByName(labelRight, false).get.schemaVersion should be("v3") } -} \ No newline at end of file + + test("check created service without ttl") { + // createService + val svc_without_ttl = "s2graph_without_ttl" + val createServiceJson = s"""{"serviceName" : "$svc_without_ttl"}""" + val (serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) = + parser.toServiceElements(Json.parse(createServiceJson)) + + val tryService = management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) + assert(tryService.isSuccess) + val service = tryService.get + assert(service.hTableTTL.isDefined) + service.hTableTTL.get should be(Integer.MAX_VALUE) + + // check labels + checkLabelTTL("label_without_ttl", svc_without_ttl, None, service.hTableTTL) + checkLabelTTL("label_with_ttl", svc_without_ttl, Some(86400), Some(86400)) + + // check copied labels + Management.deleteLabel("label_without_ttl_copied") + checkCopyLabel("label_without_ttl", "label_without_ttl_copied") + Management.deleteLabel("label_with_ttl_copied") + checkCopyLabel("label_with_ttl", "label_with_ttl_copied") + } + + test("check created service with ttl") { + // createService + val svc_with_ttl = "s2graph_with_ttl" + val ttl_val = 86400 + val (serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) = + parser.toServiceElements(Json.parse(s"""{"serviceName" : "$svc_with_ttl", "hTableTTL":$ttl_val}""")) + + val tryService = management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) + assert(tryService.isSuccess) + val service = tryService.get + assert(service.hTableTTL.isDefined) + service.hTableTTL.get should be(ttl_val) + + // check labels + checkLabelTTL("label_without_ttl", svc_with_ttl, None, service.hTableTTL) + checkLabelTTL("label_with_ttl", svc_with_ttl, Some(Integer.MAX_VALUE), Some(Integer.MAX_VALUE)) + + // check copied labels + Management.deleteLabel("label_without_ttl_copied") + checkCopyLabel("label_without_ttl", "label_without_ttl_copied") + Management.deleteLabel("label_with_ttl_copied") + checkCopyLabel("label_with_ttl", "label_with_ttl_copied") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala index 61d1096..772fcd8 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala @@ -1,159 +1,159 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core - -import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.types.{InnerVal, InnerValLike, HBaseSerializable, LabelWithDirection} -import org.scalatest.{FunSuite, Matchers} - -class QueryParamTest extends FunSuite with Matchers with TestCommon { -// val version = HBaseType.VERSION2 -// val testEdge = Management.toEdge(ts, "insert", "1", "10", labelNameV2, "out", Json.obj("is_blocked" -> true, "phone_number" -> "xxxx", "age" -> 20).toString) -// test("EdgeTransformer toInnerValOpt") { -// -// /** only labelNameV2 has string type output */ -// val jsVal = Json.arr(Json.arr("_to"), Json.arr("phone_number.$", "phone_number"), Json.arr("age.$", "age")) -// val transformer = EdgeTransformer(queryParamV2, jsVal) -// val convertedLs = transformer.transform(testEdge, None) -// -// convertedLs(0).tgtVertex.innerId.toString == "10" shouldBe true -// convertedLs(1).tgtVertex.innerId.toString == "phone_number.xxxx" shouldBe true -// convertedLs(2).tgtVertex.innerId.toString == "age.20" shouldBe true -// true +///* +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// */ +// +//package org.apache.s2graph.core +// +//import org.apache.hadoop.hbase.util.Bytes +//import org.apache.s2graph.core.types.{InnerVal, InnerValLike, HBaseSerializable, LabelWithDirection} +//import org.scalatest.{FunSuite, Matchers} +// +//class QueryParamTest extends FunSuite with Matchers with TestCommon { +//// val version = HBaseType.VERSION2 +//// val testEdge = Management.toEdge(ts, "insert", "1", "10", labelNameV2, "out", Json.obj("is_blocked" -> true, "phone_number" -> "xxxx", "age" -> 20).toString) +//// test("EdgeTransformer toInnerValOpt") { +//// +//// /** only labelNameV2 has string type output */ +//// val jsVal = Json.arr(Json.arr("_to"), Json.arr("phone_number.$", "phone_number"), Json.arr("age.$", "age")) +//// val transformer = EdgeTransformer(queryParamV2, jsVal) +//// val convertedLs = transformer.transform(testEdge, None) +//// +//// convertedLs(0).tgtVertex.innerId.toString == "10" shouldBe true +//// convertedLs(1).tgtVertex.innerId.toString == "phone_number.xxxx" shouldBe true +//// convertedLs(2).tgtVertex.innerId.toString == "age.20" shouldBe true +//// true +//// } +// +// val dummyRequests = { +// for { +// id <- 0 until 1000 +// } yield { +// Bytes.toBytes(id) +// } +// } +// +// test("QueryParam toCacheKey bytes") { +// val startedAt = System.nanoTime() +// val queryParam = S2QueryParam(LabelWithDirection(1, 0)) +// +// for { +// i <- dummyRequests.indices +// x = queryParam.toCacheKey(dummyRequests(i)) +// } { +// for { +// j <- dummyRequests.indices if i != j +// y = queryParam.toCacheKey(dummyRequests(j)) +// } { +// x should not equal y +// } +// } +// +// dummyRequests.zip(dummyRequests).foreach { case (x, y) => +// val xHash = queryParam.toCacheKey(x) +// val yHash = queryParam.toCacheKey(y) +//// println(xHash, yHash) +// xHash should be(yHash) +// } +// val duration = System.nanoTime() - startedAt +// +// println(s">> bytes: $duration") +// } +// +// test("QueryParam toCacheKey with variable params") { +// val startedAt = System.nanoTime() +// val queryParam = S2QueryParam(LabelWithDirection(1, 0)) +// +// dummyRequests.zip(dummyRequests).foreach { case (x, y) => +// x shouldBe y +// queryParam.limit(0, 10) +// var xHash = queryParam.toCacheKey(x) +// xHash shouldBe queryParam.toCacheKey(y) +// queryParam.limit(1, 10) +// var yHash = queryParam.toCacheKey(y) +// queryParam.toCacheKey(x) shouldBe yHash +//// println(xHash, yHash) +// xHash should not be yHash +// +// queryParam.limit(0, 10) +// xHash = queryParam.toCacheKey(x) +// queryParam.limit(0, 11) +// yHash = queryParam.toCacheKey(y) +// +// xHash should not be yHash +// } +// +// val duration = System.nanoTime() - startedAt +// +// println(s">> diff: $duration") +// } +// test("QueryParam interval min/max bytes padding test") { +// import HBaseSerializable._ +// val queryParam = QueryParam.Empty +// def compare(_from: Seq[InnerValLike], _to: Seq[InnerValLike], _value: Seq[InnerValLike]): Boolean = { +// val len = _from.length.toByte +// +// val from = _from.zipWithIndex map { case (innerVal: InnerValLike, idx: Int) => idx.toByte -> innerVal } +// val to = _to.zipWithIndex map { case (innerVal: InnerValLike, idx: Int) => idx.toByte -> innerVal } +// val value = _value.zipWithIndex map { case (innerVal: InnerValLike, idx: Int) => idx.toByte -> innerVal } +// +// val (fromBytes, toBytes) = queryParam.paddingInterval(len, from, to) +// val valueBytes = propsToBytes(value) +// +// val validFrom = Bytes.compareTo(fromBytes, valueBytes) <= 0 +// val validTo = Bytes.compareTo(toBytes, valueBytes) >= 0 +// +// val res = validFrom && validTo +// // if (!res) logger.error(s"from: $validFrom, to: $validTo, from: ${_from} to: ${_to} value: ${_value}") +// res +// } +// +// val v = "v3" +// compare( +// Seq(InnerVal.withLong(0L, v)), +// Seq(InnerVal.withLong(0L, v)), +// Seq(InnerVal.withLong(0L, v))) shouldBe true +// +// compare( +// Seq(InnerVal.withLong(0L, v)), +// Seq(InnerVal.withLong(0L, v)), +// Seq(InnerVal.withLong(1L, v))) shouldBe false +// +// compare( +// Seq(InnerVal.withLong(1L, v)), +// Seq(InnerVal.withLong(1L, v)), +// Seq(InnerVal.withLong(0L, v))) shouldBe false +// +// compare( +// Seq(InnerVal.withLong(0L, v)), +// Seq(InnerVal.withLong(1L, v)), +// Seq(InnerVal.withLong(2L, v))) shouldBe false +// +// val testNum = 100000 +// val tests = for { +// n <- 0 to testNum +// min = scala.util.Random.nextInt(Int.MaxValue / 2) + 1 +// max = min + scala.util.Random.nextInt(min) +// value = min + scala.util.Random.nextInt(max - min + 1) +// } yield compare( +// Seq(InnerVal.withLong(min, v)), +// Seq(InnerVal.withLong(max, v)), +// Seq(InnerVal.withLong(value, v))) +// +// tests.forall(identity) shouldBe true // } - - val dummyRequests = { - for { - id <- 0 until 1000 - } yield { - Bytes.toBytes(id) - } - } - - test("QueryParam toCacheKey bytes") { - val startedAt = System.nanoTime() - val queryParam = QueryParam(LabelWithDirection(1, 0)) - - for { - i <- dummyRequests.indices - x = queryParam.toCacheKey(dummyRequests(i)) - } { - for { - j <- dummyRequests.indices if i != j - y = queryParam.toCacheKey(dummyRequests(j)) - } { - x should not equal y - } - } - - dummyRequests.zip(dummyRequests).foreach { case (x, y) => - val xHash = queryParam.toCacheKey(x) - val yHash = queryParam.toCacheKey(y) -// println(xHash, yHash) - xHash should be(yHash) - } - val duration = System.nanoTime() - startedAt - - println(s">> bytes: $duration") - } - - test("QueryParam toCacheKey with variable params") { - val startedAt = System.nanoTime() - val queryParam = QueryParam(LabelWithDirection(1, 0)) - - dummyRequests.zip(dummyRequests).foreach { case (x, y) => - x shouldBe y - queryParam.limit(0, 10) - var xHash = queryParam.toCacheKey(x) - xHash shouldBe queryParam.toCacheKey(y) - queryParam.limit(1, 10) - var yHash = queryParam.toCacheKey(y) - queryParam.toCacheKey(x) shouldBe yHash -// println(xHash, yHash) - xHash should not be yHash - - queryParam.limit(0, 10) - xHash = queryParam.toCacheKey(x) - queryParam.limit(0, 11) - yHash = queryParam.toCacheKey(y) - - xHash should not be yHash - } - - val duration = System.nanoTime() - startedAt - - println(s">> diff: $duration") - } - test("QueryParam interval min/max bytes padding test") { - import HBaseSerializable._ - val queryParam = QueryParam.Empty - def compare(_from: Seq[InnerValLike], _to: Seq[InnerValLike], _value: Seq[InnerValLike]): Boolean = { - val len = _from.length.toByte - - val from = _from.zipWithIndex map { case (innerVal: InnerValLike, idx: Int) => idx.toByte -> innerVal } - val to = _to.zipWithIndex map { case (innerVal: InnerValLike, idx: Int) => idx.toByte -> innerVal } - val value = _value.zipWithIndex map { case (innerVal: InnerValLike, idx: Int) => idx.toByte -> innerVal } - - val (fromBytes, toBytes) = queryParam.paddingInterval(len, from, to) - val valueBytes = propsToBytes(value) - - val validFrom = Bytes.compareTo(fromBytes, valueBytes) <= 0 - val validTo = Bytes.compareTo(toBytes, valueBytes) >= 0 - - val res = validFrom && validTo - // if (!res) logger.error(s"from: $validFrom, to: $validTo, from: ${_from} to: ${_to} value: ${_value}") - res - } - - val v = "v3" - compare( - Seq(InnerVal.withLong(0L, v)), - Seq(InnerVal.withLong(0L, v)), - Seq(InnerVal.withLong(0L, v))) shouldBe true - - compare( - Seq(InnerVal.withLong(0L, v)), - Seq(InnerVal.withLong(0L, v)), - Seq(InnerVal.withLong(1L, v))) shouldBe false - - compare( - Seq(InnerVal.withLong(1L, v)), - Seq(InnerVal.withLong(1L, v)), - Seq(InnerVal.withLong(0L, v))) shouldBe false - - compare( - Seq(InnerVal.withLong(0L, v)), - Seq(InnerVal.withLong(1L, v)), - Seq(InnerVal.withLong(2L, v))) shouldBe false - - val testNum = 100000 - val tests = for { - n <- 0 to testNum - min = scala.util.Random.nextInt(Int.MaxValue / 2) + 1 - max = min + scala.util.Random.nextInt(min) - value = min + scala.util.Random.nextInt(max - min + 1) - } yield compare( - Seq(InnerVal.withLong(min, v)), - Seq(InnerVal.withLong(max, v)), - Seq(InnerVal.withLong(value, v))) - - tests.forall(identity) shouldBe true - } -} +//} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala index 584a641..12eae77 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala @@ -85,6 +85,7 @@ trait TestCommonWithModels { val preSplitSize = 0 val labelName = "_test_label" + val labelNameSecure = "_test_label_secure" val labelNameV2 = "_test_label_v2" val labelNameV3 = "_test_label_v3" val labelNameV4 = "_test_label_v4" @@ -102,6 +103,7 @@ trait TestCommonWithModels { Prop("score", "0.1", FLOAT), Prop("age", "10", INT) ) + val testIdxProps = Seq(Index("_PK", Seq("_timestamp", "affinity_score"))) val consistencyLevel = "strong" val hTableTTL = None @@ -129,10 +131,12 @@ trait TestCommonWithModels { Management.deleteLabel(labelNameV2) Management.deleteLabel(undirectedLabelName) Management.deleteLabel(undirectedLabelNameV2) + Management.deleteLabel(labelNameSecure) } def createTestLabel() = { implicit val session = AutoSession + management.createLabel(labelName, serviceName, columnName, columnType, serviceName, columnName, columnType, isDirected = true, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4", None) @@ -150,6 +154,10 @@ trait TestCommonWithModels { management.createLabel(undirectedLabelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2, isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4", None) + + management.createLabel(labelNameSecure, serviceName, columnName, columnType, serviceName, tgtColumnName, tgtColumnType, + isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4", + Option("""{ "tokens": ["xxx-yyy", "aaa-bbb"] }""")) } def service = Service.findByName(serviceName, useCache = false).get http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala index 66d84e4..87a84ae 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala @@ -32,34 +32,6 @@ class ModelTest extends FunSuite with Matchers with TestCommonWithModels with Be graph.shutdown() } - // val serviceName = "testService" - // val newServiceName = "newTestService" - // val cluster = "localhost" - // val hbaseTableName = "s2graph-dev" - // val columnName = "user_id" - // val columnType = "long" - // val labelName = "model_test_label" - // val newLabelName = "new_model_test_label" - // val columnMetaName = "is_valid_user" - // val labelMetaName = "is_hidden" - // val hbaseTableTTL = -1 - // val id = 1 - // - // val service = HService(Map("id" -> id, "serviceName" -> serviceName, "cluster" -> cluster, - // "hbaseTableName" -> hbaseTableName, "preSplitSize" -> 0, "hbaseTableTTL" -> -1)) - // val serviceColumn = HServiceColumn(Map("id" -> id, "serviceId" -> service.id.get, - // "columnName" -> columnName, "columnType" -> columnType)) - // val columnMeta = HColumnMeta(Map("id" -> id, "columnId" -> serviceColumn.id.get, "name" -> columnMetaName, "seq" -> 1.toByte)) - // val label = HLabel(Map("id" -> id, "label" -> labelName, - // "srcServiceId" -> service.id.get, "srcColumnName" -> columnName, "srcColumnType" -> columnType, - // "tgtServiceId" -> service.id.get, "tgtColumnName" -> columnName, "tgtColumnType" -> columnType, - // "isDirected" -> true, "serviceName" -> service.serviceName, "serviceId" -> service.id.get, - // "consistencyLevel" -> "weak", "hTableName" -> hbaseTableName, "hTableTTL" -> -1 - // )) - // val labelMeta = HLabelMeta(Map("id" -> id, "labelId" -> label.id.get, "name" -> labelMetaName, "seq" -> 1.toByte, - // "defaultValue" -> false, "dataType" -> "boolean", "usedInIndex" -> false)) - // val labelIndex = HLabelIndex(Map("id" -> id, "labelId" -> label.id.get, "seq" -> 1.toByte, - // "metaSeqs" -> "0", "formular" -> "none")) test("test Label.findByName") { val labelOpt = Label.findByName(labelName, useCache = false) println(labelOpt) @@ -84,66 +56,4 @@ class ModelTest extends FunSuite with Matchers with TestCommonWithModels with Be val tgtColumn = labelOpt.get.tgtService println(tgtColumn) } - // test("test create") { - // service.create() - // HService.findByName(serviceName, useCache = false) == Some(service) - // - // serviceColumn.create() - // HServiceColumn.findsByServiceId(service.id.get, useCache = false).headOption == Some(serviceColumn) - // - // columnMeta.create() - // HColumnMeta.findByName(serviceColumn.id.get, columnMetaName, useCache = false) == Some(columnMeta) - // - // label.create() - // HLabel.findByName(labelName, useCache = false) == Some(label) - // - // labelMeta.create() - // HLabelMeta.findByName(label.id.get, labelMetaName, useCache = false) == Some(labelMeta) - // - // labelIndex.create() - // HLabelIndex.findByLabelIdAll(label.id.get, useCache = false).headOption == Some(labelIndex) - // } - // - // test("test update") { - // service.update("cluster", "...") - // HService.findById(service.id.get, useCache = false).cluster == "..." - // - // service.update("serviceName", newServiceName) - // assert(HService.findByName(serviceName, useCache = false) == None) - // HService.findByName(newServiceName, useCache = false).map { service => service.id.get == service.id.get} - // - // label.update("label", newLabelName) - // HLabel.findById(label.id.get, useCache = false).label == "newLabelName" - // - // label.update("consistencyLevel", "strong") - // HLabel.findById(label.id.get, useCache = false).consistencyLevel == "strong" && - // HLabel.findByName(newLabelName).isDefined && - // HLabel.findByName(labelName) == None - // - // } - // test("test read by index") { - // val labels = HLabel.findBySrcServiceId(service.id.get, useCache = false) - // val idxs = HLabelIndex.findByLabelIdAll(label.id.get, useCache = false) - // labels.length == 1 && - // labels.head == label - // idxs.length == 1 && - // idxs.head == labelIndex - // } - // test("test delete") { - //// HLabel.findByName(labelName).foreach { label => - //// label.deleteAll() - //// } - // HLabel.findByName(newLabelName).foreach { label => - // label.deleteAll() - // } - // HLabelMeta.findAllByLabelId(label.id.get, useCache = false).isEmpty && - // HLabelIndex.findByLabelIdAll(label.id.get, useCache = false).isEmpty - // - // service.deleteAll() - // } - - // test("test labelIndex") { - // println(HLabelIndex.findByLabelIdAll(1)) - // } - } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/test/scala/org/apache/s2graph/core/mysqls/ExperimentSpec.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/mysqls/ExperimentSpec.scala b/s2core/src/test/scala/org/apache/s2graph/core/mysqls/ExperimentSpec.scala deleted file mode 100644 index 229d9bd..0000000 --- a/s2core/src/test/scala/org/apache/s2graph/core/mysqls/ExperimentSpec.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.mysqls - -import java.util.Properties - -import com.typesafe.config.ConfigFactory -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} -import scalikejdbc._ - -class ExperimentSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - val Ttl = 2 - override def beforeAll(): Unit = { - /* - maxSize = config.getInt("cache.max.size") - ttl = config.getInt("cache.ttl.seconds") - */ - val props = new Properties() - props.setProperty("cache.ttl.seconds", Ttl.toString) - Model.apply(ConfigFactory.load(ConfigFactory.parseProperties(props))) - - implicit val session = AutoSession - sql"""DELETE FROM buckets""".update().apply() - sql"""DELETE FROM experiments""".update().apply() - - val expId = sql"""INSERT INTO experiments(service_id, service_name, name, description) VALUES(1, 's1', 'exp1', '')""".updateAndReturnGeneratedKey().apply() - sql"""INSERT INTO - buckets(experiment_id, modular, http_verb, api_path, request_body, impression_id) - VALUES($expId, '1~100', 'POST', '/a/b/c', 'None', 'imp1')""".update().apply() - - } - - "Experiment" should "find bucket list" in { - Experiment.findBy(1, "exp1") should not be empty - - Experiment.findBy(1, "exp1").foreach { exp => - val bucket = exp.buckets.head - bucket.impressionId should equal("imp1") - } - } - - it should "update bucket list after cache ttl time" in { - Experiment.findBy(1, "exp1").foreach { exp => - val bucket = exp.buckets.head - bucket.impressionId should equal("imp1") - - implicit val session = AutoSession - - sql"""UPDATE buckets SET impression_id = 'imp2' WHERE id = ${bucket.id}""".update().apply() - } - - // sleep ttl time - Thread.sleep((Ttl + 1) * 1000) - - // update experiment and bucket - Experiment.findBy(1, "exp1").foreach(exp => exp.buckets) - - // wait for cache updating - Thread.sleep(1 * 1000) - - // should be updated - Experiment.findBy(1, "exp1").foreach { exp => - exp.buckets.head.impressionId should equal("imp2") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala index 40166eb..042dce2 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala @@ -23,81 +23,109 @@ import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.{Label, LabelMeta} import org.apache.s2graph.core.rest.TemplateHelper import org.apache.s2graph.core.types._ +import org.apache.s2graph.core.utils.logger import org.scalatest.{FunSuite, Matchers} import play.api.libs.json.Json +import scala.util.{Random, Try} + class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { initTests() - // dummy data for dummy edge - initTests() - import HBaseType.{VERSION1, VERSION2} val ts = System.currentTimeMillis() - val dummyTs = (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion)) - - def ids(version: String) = { - val colId = if (version == VERSION2) columnV2.id.get else column.id.get - val srcId = SourceVertexId(colId, InnerVal.withLong(1, version)) - val tgtId = TargetVertexId(colId, InnerVal.withLong(2, version)) - - val srcIdStr = SourceVertexId(colId, InnerVal.withStr("abc", version)) - val tgtIdStr = TargetVertexId(colId, InnerVal.withStr("def", version)) - - val srcVertex = Vertex(srcId, ts) - val tgtVertex = Vertex(tgtId, ts) - val srcVertexStr = Vertex(srcIdStr, ts) - val tgtVertexStr = Vertex(tgtIdStr, ts) - (srcId, tgtId, srcIdStr, tgtIdStr, srcVertex, tgtVertex, srcVertexStr, tgtVertexStr, version) - } - + val dummyTs = LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion) def validate(label: Label)(edge: Edge)(sql: String)(expected: Boolean) = { - val whereOpt = WhereParser(label).parse(sql) - whereOpt.isSuccess shouldBe true - - println("=================================================================") - println(sql) - println(whereOpt.get) - - val ret = whereOpt.get.filter(edge) - if (ret != expected) { + def debug(whereOpt: Try[Where]) = { println("==================") println(s"$whereOpt") println(s"$edge") println("==================") } - ret shouldBe expected + + val whereOpt = WhereParser(label).parse(sql) + if (whereOpt.isFailure) { + debug(whereOpt) + whereOpt.get // touch exception + } else { + val ret = whereOpt.get.filter(edge) + if (ret != expected) { + debug(whereOpt) + } + + ret shouldBe expected + } + } + + def ids = for { + version <- Seq(VERSION1, VERSION2) + } yield { + val srcId = SourceVertexId(0, InnerVal.withLong(1, version)) + val tgtId = + if (version == VERSION2) TargetVertexId(0, InnerVal.withStr("2", version)) + else TargetVertexId(0, InnerVal.withLong(2, version)) + + val srcVertex = Vertex(srcId, ts) + val tgtVertex = Vertex(tgtId, ts) + val (_label, dir) = if (version == VERSION2) (labelV2, labelWithDirV2.dir) else (label, labelWithDir.dir) + + (srcVertex, tgtVertex, _label, dir) } test("check where clause not nested") { for { - (srcId, tgtId, srcIdStr, tgtIdStr, srcVertex, tgtVertex, srcVertexStr, tgtVertexStr, schemaVer) <- List(ids(VERSION1), ids(VERSION2)) + (srcVertex, tgtVertex, label, dir) <- ids } { /** test for each version */ - val js = Json.obj("is_hidden" -> true, "is_blocked" -> false, "weight" -> 10, "time" -> 3, "name" -> "abc") + val js = Json.obj("is_hidden" -> true, "is_blocked" -> false, "weight" -> 10, "time" -> 3, "phone_number" -> "1234") val propsInner = Management.toProps(label, js.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs - val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, propsInner) + val edge = Edge(srcVertex, tgtVertex, label, dir, 0.toByte, ts, propsInner) val f = validate(label)(edge) _ /** labelName label is long-long relation */ - f(s"_to=${tgtVertex.innerId.toString}")(true) - - // currently this throw exception since label`s _to is long type. + f(s"_to=${tgtVertex.innerId}")(true) f(s"_to=19230495")(false) f(s"_to!=19230495")(true) + f(s"phone_number=1234")(true) + } + } + + test("check where clause with string literal") { + for { + (srcVertex, tgtVertex, + label, dir) <- ids + } { + /** test for each version */ + var js = Json.obj("phone_number" -> "") + var propsInner = Management.toProps(label, js.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs + var edge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, propsInner) + var f = validate(label)(edge) _ + f(s"phone_number = '' ")(true) + + js = Json.obj("phone_number" -> "010 3167 1897") + propsInner = Management.toProps(label, js.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs + edge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, propsInner) + f = validate(label)(edge) _ + f(s"phone_number = '010 3167 1897' ")(true) + + js = Json.obj("phone_number" -> "010' 3167 1897") + propsInner = Management.toProps(label, js.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs + edge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, propsInner) + f = validate(label)(edge) _ + f(s"phone_number = '010\\' 3167 1897' ")(true) } } test("check where clause nested") { for { - (srcId, tgtId, srcIdStr, tgtIdStr, srcVertex, tgtVertex, srcVertexStr, tgtVertexStr, schemaVer) <- List(ids(VERSION1), ids(VERSION2)) + (srcVertex, tgtVertex, label, dir) <- ids } { /** test for each version */ val js = Json.obj("is_hidden" -> true, "is_blocked" -> false, "weight" -> 10, "time" -> 3, "name" -> "abc") val propsInner = Management.toProps(label, js.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs - val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, propsInner) + val edge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, propsInner) val f = validate(label)(edge) _ @@ -119,19 +147,19 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { } } + test("check where clause with from/to long") { for { - (srcId, tgtId, srcIdStr, tgtIdStr, srcVertex, tgtVertex, srcVertexStr, tgtVertexStr, schemaVer) <- List(ids(VERSION1), ids(VERSION2)) + (srcVertex, tgtVertex, label, dir) <- ids } { /** test for each version */ val js = Json.obj("is_hidden" -> true, "is_blocked" -> false, "weight" -> 10, "time" -> 3, "name" -> "abc") val propsInner = Management.toProps(label, js.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs - val labelWithDirection = if (schemaVer == VERSION2) labelWithDirV2 else labelWithDir - val edge = Edge(srcVertex, tgtVertex, labelWithDirection, 0.toByte, ts, propsInner) - val lname = if (schemaVer == VERSION2) labelNameV2 else labelName + val edge = Edge(srcVertex, tgtVertex, label, dir, 0.toByte, ts, propsInner) val f = validate(label)(edge) _ f(s"_from = -1 or _to = ${tgtVertex.innerId.value}")(true) + f(s"_to = 2")(true) f(s"_from = ${srcVertex.innerId.value} and _to = ${tgtVertex.innerId.value}")(true) f(s"_from = ${tgtVertex.innerId.value} and _to = 102934")(false) f(s"_from = -1")(false) @@ -139,10 +167,9 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { } } - test("check where clause with parent") { for { - (srcId, tgtId, srcIdStr, tgtIdStr, srcVertex, tgtVertex, srcVertexStr, tgtVertexStr, schemaVer) <- List(ids(VERSION1), ids(VERSION2)) + (srcVertex, tgtVertex, label, dir) <- ids } { /** test for each version */ val js = Json.obj("is_hidden" -> true, "is_blocked" -> false, "weight" -> 10, "time" -> 1, "name" -> "abc") @@ -151,11 +178,11 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { val propsInner = Management.toProps(label, js.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs val parentPropsInner = Management.toProps(label, parentJs.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs - val grandParentEdge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, parentPropsInner) - val parentEdge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, parentPropsInner, - parentEdges = Seq(EdgeWithScore(grandParentEdge, 1.0))) - val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, propsInner, - parentEdges = Seq(EdgeWithScore(parentEdge, 1.0))) + val grandParentEdge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, parentPropsInner) + val parentEdge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, parentPropsInner, + parentEdges = Seq(EdgeWithScore(grandParentEdge, 1.0, grandParentEdge.label))) + val edge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, propsInner, + parentEdges = Seq(EdgeWithScore(parentEdge, 1.0, grandParentEdge.label))) println(edge.toString) println(parentEdge.toString) @@ -190,13 +217,16 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { val ts = 0 import TemplateHelper._ + calculate(ts, 1, "minute") should be(minute + ts) calculate(ts, 1, "hour") should be(hour + ts) calculate(ts, 1, "day") should be(day + ts) calculate(ts + 10, 1, "HOUR") should be(hour + ts + 10) calculate(ts + 10, 1, "DAY") should be(day + ts + 10) - val body = """{ + val body = + """{ + "minute": ${1 minute}, "day": ${1day}, "hour": ${1hour}, "-day": "${-10 day}", @@ -208,21 +238,26 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { val parsed = replaceVariable(ts, body) val json = Json.parse(parsed) - (json \ "day").as[Long] should be (1 * day + ts) - (json \ "hour").as[Long] should be (1 * hour + ts) + (json \ "minute").as[Long] should be(1 * minute + ts) - (json \ "-day").as[Long] should be (-10 * day + ts) - (json \ "-hour").as[Long] should be (-10 * hour + ts) + (json \ "day").as[Long] should be(1 * day + ts) + (json \ "hour").as[Long] should be(1 * hour + ts) - (json \ "now").as[Long] should be (ts) + (json \ "-day").as[Long] should be(-10 * day + ts) + (json \ "-hour").as[Long] should be(-10 * hour + ts) - val otherBody = """{ + (json \ "now").as[Long] should be(ts) + + val otherBody = + """{ + "nextminute": "${next_minute}", "nextday": "${next_day}", "3dayago": "${next_day - 3 day}", "nexthour": "${next_hour}" }""" - val currentTs = System.currentTimeMillis() + val currentTs = 1474422964000l + val expectedMinuteTs = currentTs / minute * minute + minute val expectedDayTs = currentTs / day * day + day val expectedHourTs = currentTs / hour * hour + hour val threeDayAgo = expectedDayTs - 3 * day @@ -232,22 +267,15 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { val parsed = replaceVariable(ts, otherBody) val json = Json.parse(parsed) + (json \ "nextminute").as[Long] should be(expectedMinuteTs) (json \ "nextday").as[Long] should be(expectedDayTs) (json \ "nexthour").as[Long] should be(expectedHourTs) (json \ "3dayago").as[Long] should be(threeDayAgo) } - } - // test("time decay") { - // val ts = System.currentTimeMillis() - // - // for { - // i <- (0 until 10) - // } { - // val timeUnit = 60 * 60 - // val diff = i * timeUnit - // val x = TimeDecay(1.0, 0.05, timeUnit) - // println(x.decay(diff)) - // } - // } + (0 until 1000).forall { ith => + val r = replaceVariable(ts, "${randint( 10, 30 )}").toInt + r >= 10 && r < 30 + } + } }
