http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala deleted file mode 100644 index 2aad32f..0000000 --- a/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala +++ /dev/null @@ -1,536 +0,0 @@ -package com.kakao.s2graph.core - -import com.kakao.s2graph.core.mysqls.LabelMeta -import com.kakao.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId} -import com.kakao.s2graph.core.utils.logger -import org.scalatest.FunSuite -import org.scalatest.matchers.Matcher - -class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { - initTests() - - test("toLogString") { - val testLabelName = labelNameV2 - val bulkQueries = List( - ("1445240543366", "update", "{\"is_blocked\":true}"), - ("1445240543362", "insert", "{\"is_hidden\":false}"), - ("1445240543364", "insert", "{\"is_hidden\":false,\"weight\":10}"), - ("1445240543363", "delete", "{}"), - ("1445240543365", "update", "{\"time\":1, \"weight\":-10}")) - - val (srcId, tgtId, labelName) = ("1", "2", testLabelName) - - val bulkEdge = (for ((ts, op, props) <- bulkQueries) yield { - Management.toEdge(ts.toLong, op, srcId, tgtId, labelName, "out", props).toLogString - }).mkString("\n") - - val expected = Seq( - Seq("1445240543366", "update", "e", "1", "2", testLabelName, "{\"is_blocked\":true}"), - Seq("1445240543362", "insert", "e", "1", "2", testLabelName, "{\"is_hidden\":false}"), - Seq("1445240543364", "insert", "e", "1", "2", testLabelName, "{\"is_hidden\":false,\"weight\":10}"), - Seq("1445240543363", "delete", "e", "1", "2", testLabelName), - Seq("1445240543365", "update", "e", "1", "2", testLabelName, "{\"time\":1,\"weight\":-10}") - ).map(_.mkString("\t")).mkString("\n") - - assert(bulkEdge === expected) - } - - test("buildOperation") { - val schemaVersion = "v2" - val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion)) - val srcVertex = Vertex(vertexId) - val tgtVertex = srcVertex - - val timestampProp = LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) - - val snapshotEdge = None - val propsWithTs = Map(timestampProp) - val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = propsWithTs) - val newVersion = 0L - - val newPropsWithTs = Map( - timestampProp, - 1.toByte -> InnerValLikeWithTs(InnerVal.withBoolean(false, schemaVersion), 1) - ) - - val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, propsWithTs, newPropsWithTs) - logger.info(edgeMutate.toLogString) - - assert(edgeMutate.newSnapshotEdge.isDefined) - assert(edgeMutate.edgesToInsert.nonEmpty) - assert(edgeMutate.edgesToDelete.isEmpty) - } - - test("buildMutation: snapshotEdge: None with newProps") { - val schemaVersion = "v2" - val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion)) - val srcVertex = Vertex(vertexId) - val tgtVertex = srcVertex - - val timestampProp = LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) - - val snapshotEdge = None - val propsWithTs = Map(timestampProp) - val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = propsWithTs) - val newVersion = 0L - - val newPropsWithTs = Map( - timestampProp, - 1.toByte -> InnerValLikeWithTs(InnerVal.withBoolean(false, schemaVersion), 1) - ) - - val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, propsWithTs, newPropsWithTs) - logger.info(edgeMutate.toLogString) - - assert(edgeMutate.newSnapshotEdge.isDefined) - assert(edgeMutate.edgesToInsert.nonEmpty) - assert(edgeMutate.edgesToDelete.isEmpty) - } - - test("buildMutation: oldPropsWithTs == newPropsWithTs, Drop all requests") { - val schemaVersion = "v2" - val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion)) - val srcVertex = Vertex(vertexId) - val tgtVertex = srcVertex - - val timestampProp = LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) - - val snapshotEdge = None - val propsWithTs = Map(timestampProp) - val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = propsWithTs) - val newVersion = 0L - - val newPropsWithTs = propsWithTs - - val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, propsWithTs, newPropsWithTs) - logger.info(edgeMutate.toLogString) - - assert(edgeMutate.newSnapshotEdge.isEmpty) - assert(edgeMutate.edgesToInsert.isEmpty) - assert(edgeMutate.edgesToDelete.isEmpty) - } - - test("buildMutation: All props older than snapshotEdge's LastDeletedAt") { - val schemaVersion = "v2" - val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion)) - val srcVertex = Vertex(vertexId) - val tgtVertex = srcVertex - - val timestampProp = LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) - val oldPropsWithTs = Map( - timestampProp, - LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) - ) - - val propsWithTs = Map( - timestampProp, - 3.toByte -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 2), - LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) - ) - - val snapshotEdge = - Option(Edge(srcVertex, tgtVertex, labelWithDirV2, op = GraphUtil.operations("delete"), propsWithTs = oldPropsWithTs)) - - val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = propsWithTs) - val newVersion = 0L - val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) - logger.info(edgeMutate.toLogString) - - assert(edgeMutate.newSnapshotEdge.nonEmpty) - assert(edgeMutate.edgesToInsert.isEmpty) - assert(edgeMutate.edgesToDelete.isEmpty) - } - - test("buildMutation: All props newer than snapshotEdge's LastDeletedAt") { - val schemaVersion = "v2" - val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion)) - val srcVertex = Vertex(vertexId) - val tgtVertex = srcVertex - - val timestampProp = LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) - val oldPropsWithTs = Map( - timestampProp, - LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) - ) - - val propsWithTs = Map( - timestampProp, - 3.toByte -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 4), - LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) - ) - - val snapshotEdge = - Option(Edge(srcVertex, tgtVertex, labelWithDirV2, op = GraphUtil.operations("delete"), propsWithTs = oldPropsWithTs)) - - val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = propsWithTs) - val newVersion = 0L - val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) - logger.info(edgeMutate.toLogString) - - assert(edgeMutate.newSnapshotEdge.nonEmpty) - assert(edgeMutate.edgesToInsert.nonEmpty) - assert(edgeMutate.edgesToDelete.isEmpty) - } -} - -//import com.kakao.s2graph.core.types._ -//import org.hbase.async.PutRequest -//import org.scalatest.{FunSuite, Matchers} -// -///** -// * Created by shon on 5/29/15. -// */ -//class EdgeTest extends FunSuite with Matchers with TestCommon with TestCommonWithModels { -// -// -// import HBaseType.{VERSION1, VERSION2} -// -// -// def srcVertex(innerVal: InnerValLike)(version: String) = { -// val colId = if (version == VERSION1) column.id.get else columnV2.id.get -// Vertex(SourceVertexId(colId, innerVal), ts) -// } -// -// def tgtVertex(innerVal: InnerValLike)(version: String) = { -// val colId = if (version == VERSION1) column.id.get else columnV2.id.get -// Vertex(TargetVertexId(colId, innerVal), ts) -// } -// -// -// def testEdges(version: String) = { -// val innerVals = if (version == VERSION1) intInnerVals else intInnerValsV2 -// val tgtV = tgtVertex(innerVals.head)(version) -// innerVals.tail.map { intInnerVal => -// val labelWithDirection = if (version == VERSION1) labelWithDir else labelWithDirV2 -// val idxPropsList = if (version == VERSION1) idxPropsLs else idxPropsLsV2 -// idxPropsList.map { idxProps => -// val currentTs = idxProps.toMap.get(0.toByte).get.toString.toLong -// val idxPropsWithTs = idxProps.map { case (k, v) => k -> InnerValLikeWithTs(v, currentTs) } -// -// Edge(srcVertex(intInnerVal)(version), tgtV, labelWithDirection, op, currentTs, currentTs, idxPropsWithTs.toMap) -// } -// } -// } -// -// def testPropsUpdate(oldProps: Map[Byte, InnerValLikeWithTs], -// newProps: Map[Byte, InnerValLikeWithTs], -// expected: Map[Byte, Any], -// expectedShouldUpdate: Boolean) -// (f: PropsPairWithTs => (Map[Byte, InnerValLikeWithTs], Boolean))(version: String) = { -// -// val timestamp = newProps.toList.head._2.ts -// val (updated, shouldUpdate) = f((oldProps, newProps, timestamp, version)) -// val rets = for { -// (k, v) <- expected -// } yield { -// v match { -// case v: String => -// v match { -// case "left" => updated.get(k).isDefined && updated(k) == oldProps(k) -// case "right" => updated.get(k).isDefined && updated(k) == newProps(k) -// case "none" => updated.get(k).isEmpty -// } -// case value: InnerValLikeWithTs => updated.get(k).get == value -// case _ => throw new RuntimeException(s"not supported keyword: $v") -// } -// } -// println(rets) -// rets.forall(x => x) && shouldUpdate == expectedShouldUpdate -// } -// -// def testEdgeWithIndex(edges: Seq[Seq[Edge]])(queryParam: QueryParam) = { -// val rets = for { -// edgeForSameTgtVertex <- edges -// } yield { -// val head = edgeForSameTgtVertex.head -// val start = head -// var prev = head -// val rets = for { -// edge <- edgeForSameTgtVertex.tail -// } yield { -// println(s"prevEdge: $prev") -// println(s"currentEdge: $edge") -// val prevEdgeWithIndex = edge.edgesWithIndex -// val edgesWithIndex = edge.edgesWithIndex -// -// /** test encodeing decoding */ -// for { -// edgeWithIndex <- edge.edgesWithIndex -// put <- edgeWithIndex.buildPutsAsync() -// kv <- putToKeyValues(put.asInstanceOf[PutRequest]) -// } { -// val decoded = Edge.toEdge(kv, queryParam, None, Seq()) -// val comp = decoded.isDefined && decoded.get == edge -// println(s"${decoded.get}") -// println(s"$edge") -// println(s"${decoded.get == edge}") -// comp shouldBe true -// -// /** test order -// * same source, target vertex. same indexProps keys. -// * only difference is indexProps values so comparing qualifier is good enough -// * */ -// for { -// prevEdgeWithIndex <- prev.edgesWithIndex -// } { -// println(edgeWithIndex.qualifier) -// println(prevEdgeWithIndex.qualifier) -// println(edgeWithIndex.qualifier.bytes.toList) -// println(prevEdgeWithIndex.qualifier.bytes.toList) -// /** since index of this test label only use 0, 1 as indexProps -// * if 0, 1 is not different then qualifier bytes should be same -// * */ -// val comp = lessThanEqual(edgeWithIndex.qualifier.bytes, prevEdgeWithIndex.qualifier.bytes) -// comp shouldBe true -// } -// } -// prev = edge -// } -// } -// } -// -// def testInvertedEdge(edges: Seq[Seq[Edge]])(queryParam: QueryParam) = { -// val rets = for { -// edgeForSameTgtVertex <- edges -// } yield { -// val head = edgeForSameTgtVertex.head -// val start = head -// var prev = head -// val rets = for { -// edge <- edgeForSameTgtVertex.tail -// } yield { -// println(s"prevEdge: $prev") -// println(s"currentEdge: $edge") -// val prevEdgeWithIndexInverted = prev.toSnapshotEdge -// val edgeWithInvertedIndex = edge.toSnapshotEdge -// /** test encode decoding */ -// -// val put = edgeWithInvertedIndex.buildPutAsync() -// for { -// kv <- putToKeyValues(put) -// } yield { -// val decoded = Edge.toSnapshotEdge(kv, queryParam, None, isInnerCall = false, Seq()) -// val comp = decoded.isDefined && decoded.get == edge -// println(s"${decoded.get}") -// println(s"$edge") -// println(s"${decoded.get == edge}") -// comp shouldBe true -// -// /** no need to test ordering because qualifier only use targetVertexId */ -// } -// prev = edge -// } -// } -// } -// -// test("insert for edgesWithIndex version 2") { -// val version = VERSION2 -// testEdgeWithIndex(testEdges(version))(queryParamV2) -// } -// test("insert for edgesWithIndex version 1") { -// val version = VERSION1 -// testEdgeWithIndex(testEdges(version))(queryParam) -// } -// -// test("insert for edgeWithInvertedIndex version 1") { -// val version = VERSION1 -// testInvertedEdge(testEdges(version))(queryParam) -// } -// -// test("insert for edgeWithInvertedIndex version 2") { -// val version = VERSION2 -// testInvertedEdge(testEdges(version))(queryParamV2) -// } -// -// -// // /** test cases for each operation */ -// -// def oldProps(timestamp: Long, version: String) = { -// Map( -// labelMeta.lastDeletedAt -> InnerValLikeWithTs.withLong(timestamp - 2, timestamp - 2, version), -// 1.toByte -> InnerValLikeWithTs.withLong(0L, timestamp, version), -// 2.toByte -> InnerValLikeWithTs.withLong(1L, timestamp - 1, version), -// 4.toByte -> InnerValLikeWithTs.withStr("old", timestamp - 1, version) -// ) -// } -// -// def newProps(timestamp: Long, version: String) = { -// Map( -// 2.toByte -> InnerValLikeWithTs.withLong(-10L, timestamp, version), -// 3.toByte -> InnerValLikeWithTs.withLong(20L, timestamp, version) -// ) -// } -// -// def deleteProps(timestamp: Long, version: String) = Map( -// labelMeta.lastDeletedAt -> InnerValLikeWithTs.withLong(timestamp, timestamp, version) -// ) -// -// /** upsert */ -// test("Edge.buildUpsert") { -// val shouldUpdate = true -// val oldState = oldProps(ts, VERSION2) -// val newState = newProps(ts + 1, VERSION2) -// val expected = Map( -// labelMeta.lastDeletedAt -> "left", -// 1.toByte -> "none", -// 2.toByte -> "right", -// 3.toByte -> "right", -// 4.toByte -> "none") -// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildUpsert)(VERSION2) shouldBe true -// } -// test("Edge.buildUpsert shouldUpdate false") { -// val shouldUpdate = false -// val oldState = oldProps(ts, VERSION2) -// val newState = newProps(ts - 10, VERSION2) -// val expected = Map( -// labelMeta.lastDeletedAt -> "left", -// 1.toByte -> "left", -// 2.toByte -> "left", -// 3.toByte -> "none", -// 4.toByte -> "left") -// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildUpsert)(VERSION2) shouldBe true -// } -// -// /** update */ -// test("Edge.buildUpdate") { -// val shouldUpdate = true -// val oldState = oldProps(ts, VERSION2) -// val newState = newProps(ts + 1, VERSION2) -// val expected = Map( -// labelMeta.lastDeletedAt -> "left", -// 1.toByte -> "left", -// 2.toByte -> "right", -// 3.toByte -> "right", -// 4.toByte -> "left" -// ) -// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildUpdate)(VERSION2) shouldBe true -// } -// test("Edge.buildUpdate shouldUpdate false") { -// val shouldUpdate = false -// val oldState = oldProps(ts, VERSION2) -// val newState = newProps(ts - 10, VERSION2) -// val expected = Map( -// labelMeta.lastDeletedAt -> "left", -// 1.toByte -> "left", -// 2.toByte -> "left", -// 3.toByte -> "none", -// 4.toByte -> "left" -// ) -// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildUpdate)(VERSION2) shouldBe true -// } -// -// /** delete */ -// test("Edge.buildDelete") { -// val shouldUpdate = true -// val oldState = oldProps(ts, VERSION2) -// val newState = deleteProps(ts + 1, VERSION2) -// val expected = Map( -// labelMeta.lastDeletedAt -> "right", -// 1.toByte -> "none", -// 2.toByte -> "none", -// 4.toByte -> "none" -// ) -// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildDelete)(VERSION2) shouldBe true -// } -// test("Edge.buildDelete shouldUpdate false") { -// val shouldUpdate = false -// val oldState = oldProps(ts, VERSION2) -// val newState = deleteProps(ts - 10, VERSION2) -// val expected = Map( -// labelMeta.lastDeletedAt -> "left", -// 1.toByte -> "left", -// 2.toByte -> "left", -// 4.toByte -> "left" -// ) -// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildDelete)(VERSION2) shouldBe true -// } -// -// /** increment */ -// test("Edge.buildIncrement") { -// val shouldUpdate = true -// val oldState = oldProps(ts, VERSION2).filterNot(kv => kv._1 == 4.toByte) -// val newState = newProps(ts + 1, VERSION2) -// val expected = Map( -// labelMeta.lastDeletedAt -> "left", -// 1.toByte -> "left", -// 2.toByte -> InnerValLikeWithTs.withLong(-9L, ts - 1, VERSION2), -// 3.toByte -> "right" -// ) -// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildIncrement)(VERSION2) shouldBe true -// } -// test("Edge.buildIncrement shouldRepalce false") { -// val shouldUpdate = false -// val oldState = oldProps(ts, VERSION2).filterNot(kv => kv._1 == 4.toByte) -// val newState = newProps(ts - 10, VERSION2) -// val expected = Map( -// labelMeta.lastDeletedAt -> "left", -// 1.toByte -> "left", -// 2.toByte -> "left" -// ) -// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildIncrement)(VERSION2) shouldBe true -// } -// -// test("Edge`s srcVertex") { -// -// val version = VERSION2 -// val srcId = InnerVal.withLong(10, version) -// val tgtId = InnerVal.withStr("abc", version) -// val srcColumn = columnV2 -// val tgtColumn = tgtColumnV2 -// val srcVertexId = VertexId(srcColumn.id.get, srcId) -// val tgtVertexId = VertexId(tgtColumn.id.get, tgtId) -// -// val srcVertex = Vertex(srcVertexId) -// val tgtVertex = Vertex(tgtVertexId) -// -// val labelId = undirectedLabelV2.id.get -// -// val outDir = LabelWithDirection(labelId, GraphUtil.directions("out")) -// val inDir = LabelWithDirection(labelId, GraphUtil.directions("in")) -// val bothDir = LabelWithDirection(labelId, GraphUtil.directions("undirected")) -// -// val op = GraphUtil.operations("insert") -// -// -// val bothEdge = Edge(srcVertex, tgtVertex, bothDir) -// println(s"edge: $bothEdge") -// bothEdge.relatedEdges.foreach { edge => -// println(edge) -// } -// -// } -// test("edge buildIncrementBulk") { -// -// /** -// * 172567371 List(97, 74, 2, 117, -74, -44, -76, 0, 0, 4, 8, 2) -//169116518 List(68, -110, 2, 117, -21, 124, -103, 0, 0, 4, 9, 2) -//11646834 List(17, 33, 2, 127, 78, 72, -115, 0, 0, 4, 9, 2) -//148171217 List(62, 54, 2, 119, 43, 22, 46, 0, 0, 4, 9, 2) -//116315188 List(41, 86, 2, 121, 17, 43, -53, 0, 0, 4, 9, 2) -//180667876 List(48, -82, 2, 117, 59, 58, 27, 0, 0, 4, 8, 2) -//4594410 List(82, 29, 2, 127, -71, -27, 21, 0, 0, 4, 8, 2) -//151435444 List(1, 105, 2, 118, -7, 71, 75, 0, 0, 4, 8, 2) -//168460895 List(67, -35, 2, 117, -11, 125, -96, 0, 0, 4, 9, 2) -//7941614 List(115, 67, 2, 127, -122, -46, 17, 0, 0, 4, 8, 2) -//171169732 List(61, -42, 2, 117, -52, 40, 59, 0, 0, 4, 9, 2) -//174381375 List(91, 2, 2, 117, -101, 38, -64, 0, 0, 4, 9, 2) -//12754019 List(9, -80, 2, 127, 61, 99, -100, 0, 0, 4, 9, 2) -//175518092 List(111, 32, 2, 117, -119, -50, 115, 0, 0, 4, 8, 2) -//174748531 List(28, -81, 2, 117, -107, -116, -116, 0, 0, 4, 8, 2) -// -// */ -// // val incrementsOpt = Edge.buildIncrementDegreeBulk("169116518", "talk_friend_long_term_agg_test", "out", 10) -// // -// // for { -// // increments <- incrementsOpt -// // increment <- increments -// // (cf, qs) <- increment.getFamilyMapOfLongs -// // (q, v) <- qs -// // } { -// // println(increment.getRow.toList) -// // println(q.toList) -// // println(v) -// // } -// //List(68, -110, -29, -4, 116, -24, 124, -37, 0, 0, 52, -44, 2) -// } -//}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala deleted file mode 100644 index 1c09778..0000000 --- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala +++ /dev/null @@ -1,226 +0,0 @@ -package com.kakao.s2graph.core.Integrate - -import com.kakao.s2graph.core.mysqls._ -import play.api.libs.json.{JsObject, Json} - -class CrudTest extends IntegrateCommon { - import CrudHelper._ - import TestUtil._ - - test("test CRUD") { - var tcNum = 0 - var tcString = "" - var bulkQueries = List.empty[(Long, String, String)] - var expected = Map.empty[String, String] - - val curTime = System.currentTimeMillis - val t1 = curTime + 0 - val t2 = curTime + 1 - val t3 = curTime + 2 - val t4 = curTime + 3 - val t5 = curTime + 4 - - val tcRunner = new CrudTestRunner() - tcNum = 1 - tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test " - - bulkQueries = List( - (t1, "insert", "{\"time\": 10}"), - (t2, "delete", ""), - (t3, "insert", "{\"time\": 10, \"weight\": 20}")) - expected = Map("time" -> "10", "weight" -> "20") - - tcRunner.run(tcNum, tcString, bulkQueries, expected) - - tcNum = 2 - tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test " - bulkQueries = List( - (t1, "insert", "{\"time\": 10}"), - (t3, "insert", "{\"time\": 10, \"weight\": 20}"), - (t2, "delete", "")) - expected = Map("time" -> "10", "weight" -> "20") - - tcRunner.run(tcNum, tcString, bulkQueries, expected) - - tcNum = 3 - tcString = "[t3 -> t2 -> t1 test case] insert(t3) delete(t2) insert(t1) test " - bulkQueries = List( - (t3, "insert", "{\"time\": 10, \"weight\": 20}"), - (t2, "delete", ""), - (t1, "insert", "{\"time\": 10}")) - expected = Map("time" -> "10", "weight" -> "20") - - tcRunner.run(tcNum, tcString, bulkQueries, expected) - - tcNum = 4 - tcString = "[t3 -> t1 -> t2 test case] insert(t3) insert(t1) delete(t2) test " - bulkQueries = List( - (t3, "insert", "{\"time\": 10, \"weight\": 20}"), - (t1, "insert", "{\"time\": 10}"), - (t2, "delete", "")) - expected = Map("time" -> "10", "weight" -> "20") - - tcRunner.run(tcNum, tcString, bulkQueries, expected) - - tcNum = 5 - tcString = "[t2 -> t1 -> t3 test case] delete(t2) insert(t1) insert(t3) test" - bulkQueries = List( - (t2, "delete", ""), - (t1, "insert", "{\"time\": 10}"), - (t3, "insert", "{\"time\": 10, \"weight\": 20}")) - expected = Map("time" -> "10", "weight" -> "20") - - tcRunner.run(tcNum, tcString, bulkQueries, expected) - - tcNum = 6 - tcString = "[t2 -> t3 -> t1 test case] delete(t2) insert(t3) insert(t1) test " - bulkQueries = List( - (t2, "delete", ""), - (t3, "insert", "{\"time\": 10, \"weight\": 20}"), - (t1, "insert", "{\"time\": 10}")) - expected = Map("time" -> "10", "weight" -> "20") - - tcRunner.run(tcNum, tcString, bulkQueries, expected) - - tcNum = 7 - tcString = "[t1 -> t2 -> t3 test case] update(t1) delete(t2) update(t3) test " - bulkQueries = List( - (t1, "update", "{\"time\": 10}"), - (t2, "delete", ""), - (t3, "update", "{\"time\": 10, \"weight\": 20}")) - expected = Map("time" -> "10", "weight" -> "20") - - tcRunner.run(tcNum, tcString, bulkQueries, expected) - tcNum = 8 - tcString = "[t1 -> t3 -> t2 test case] update(t1) update(t3) delete(t2) test " - bulkQueries = List( - (t1, "update", "{\"time\": 10}"), - (t3, "update", "{\"time\": 10, \"weight\": 20}"), - (t2, "delete", "")) - expected = Map("time" -> "10", "weight" -> "20") - - tcRunner.run(tcNum, tcString, bulkQueries, expected) - tcNum = 9 - tcString = "[t2 -> t1 -> t3 test case] delete(t2) update(t1) update(t3) test " - bulkQueries = List( - (t2, "delete", ""), - (t1, "update", "{\"time\": 10}"), - (t3, "update", "{\"time\": 10, \"weight\": 20}")) - expected = Map("time" -> "10", "weight" -> "20") - - tcRunner.run(tcNum, tcString, bulkQueries, expected) - tcNum = 10 - tcString = "[t2 -> t3 -> t1 test case] delete(t2) update(t3) update(t1) test" - bulkQueries = List( - (t2, "delete", ""), - (t3, "update", "{\"time\": 10, \"weight\": 20}"), - (t1, "update", "{\"time\": 10}")) - expected = Map("time" -> "10", "weight" -> "20") - - tcRunner.run(tcNum, tcString, bulkQueries, expected) - tcNum = 11 - tcString = "[t3 -> t2 -> t1 test case] update(t3) delete(t2) update(t1) test " - bulkQueries = List( - (t3, "update", "{\"time\": 10, \"weight\": 20}"), - (t2, "delete", ""), - (t1, "update", "{\"time\": 10}")) - expected = Map("time" -> "10", "weight" -> "20") - - tcRunner.run(tcNum, tcString, bulkQueries, expected) - tcNum = 12 - tcString = "[t3 -> t1 -> t2 test case] update(t3) update(t1) delete(t2) test " - bulkQueries = List( - (t3, "update", "{\"time\": 10, \"weight\": 20}"), - (t1, "update", "{\"time\": 10}"), - (t2, "delete", "")) - expected = Map("time" -> "10", "weight" -> "20") - - tcRunner.run(tcNum, tcString, bulkQueries, expected) - - tcNum = 13 - tcString = "[t5 -> t1 -> t3 -> t2 -> t4 test case] update(t5) insert(t1) insert(t3) delete(t2) update(t4) test " - bulkQueries = List( - (t5, "update", "{\"is_blocked\": true}"), - (t1, "insert", "{\"is_hidden\": false}"), - (t3, "insert", "{\"is_hidden\": false, \"weight\": 10}"), - (t2, "delete", ""), - (t4, "update", "{\"time\": 1, \"weight\": -10}")) - expected = Map("time" -> "1", "weight" -> "-10", "is_hidden" -> "false", "is_blocked" -> "true") - - tcRunner.run(tcNum, tcString, bulkQueries, expected) - } - - - object CrudHelper { - - class CrudTestRunner { - var seed = 0 - - def run(tcNum: Int, tcString: String, opWithProps: List[(Long, String, String)], expected: Map[String, String]) = { - for { - labelName <- List(testLabelName, testLabelName2) - i <- 0 until NumOfEachTest - } { - seed += 1 - val srcId = seed.toString - val tgtId = srcId - - val maxTs = opWithProps.map(t => t._1).max - - /** insert edges */ - println(s"---- TC${tcNum}_init ----") - val bulkEdges = (for ((ts, op, props) <- opWithProps) yield { - TestUtil.toEdge(ts, op, "e", srcId, tgtId, labelName, props) - }) - - TestUtil.insertEdgesSync(bulkEdges: _*) - - for { - label <- Label.findByName(labelName) - direction <- List("out", "in") - cacheTTL <- List(-1L) - } { - val (serviceName, columnName, id, otherId) = direction match { - case "out" => (label.srcService.serviceName, label.srcColumn.columnName, srcId, tgtId) - case "in" => (label.tgtService.serviceName, label.tgtColumn.columnName, tgtId, srcId) - } - - val qId = if (labelName == testLabelName) id else "\"" + id + "\"" - val query = queryJson(serviceName, columnName, labelName, qId, direction, cacheTTL) - - val jsResult = TestUtil.getEdgesSync(query) - - val results = jsResult \ "results" - val deegrees = (jsResult \ "degrees").as[List[JsObject]] - val propsLs = (results \\ "props").seq - (deegrees.head \ LabelMeta.degree.name).as[Int] should be(1) - - val from = (results \\ "from").seq.last.toString.replaceAll("\"", "") - val to = (results \\ "to").seq.last.toString.replaceAll("\"", "") - - from should be(id.toString) - to should be(otherId.toString) - (results \\ "_timestamp").seq.last.as[Long] should be(maxTs) - - for ((key, expectedVal) <- expected) { - propsLs.last.as[JsObject].keys.contains(key) should be(true) - (propsLs.last \ key).toString should be(expectedVal) - } - } - } - } - - def queryJson(serviceName: String, columnName: String, labelName: String, id: String, dir: String, cacheTTL: Long = -1L) = Json.parse( - s""" { "srcVertices": [ - { "serviceName": "$serviceName", - "columnName": "$columnName", - "id": $id } ], - "steps": [ [ { - "label": "$labelName", - "direction": "$dir", - "offset": 0, - "limit": 10, - "cacheTTL": $cacheTTL }]]}""") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala deleted file mode 100644 index f8bf7af..0000000 --- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala +++ /dev/null @@ -1,309 +0,0 @@ -package com.kakao.s2graph.core.Integrate - -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls.Label -import com.kakao.s2graph.core.rest.{RequestParser, RestHandler} -import com.kakao.s2graph.core.utils.logger -import com.typesafe.config._ -import org.scalatest._ -import play.api.libs.json.{JsValue, Json} - -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContext, Future} - -trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { - - import TestUtil._ - - var graph: Graph = _ - var parser: RequestParser = _ - var management: Management = _ - var config: Config = _ - - override def beforeAll = { - config = ConfigFactory.load() - graph = new Graph(config)(ExecutionContext.Implicits.global) - management = new Management(graph) - parser = new RequestParser(graph.config) - initTestData() - } - - override def afterAll(): Unit = { - graph.shutdown() - } - - /** - * Make Service, Label, Vertex for integrate test - */ - def initTestData() = { - println("[init start]: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") - Management.deleteService(testServiceName) - - // 1. createService - val jsValue = Json.parse(createService) - val (serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) = - parser.toServiceElements(jsValue) - - val tryRes = - management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) - println(s">> Service created : $createService, $tryRes") - - val labelNames = Map(testLabelName -> testLabelNameCreate, - testLabelName2 -> testLabelName2Create, - testLabelNameV1 -> testLabelNameV1Create, - testLabelNameWeak -> testLabelNameWeakCreate) - - for { - (labelName, create) <- labelNames - } { - Management.deleteLabel(labelName) - Label.findByName(labelName, useCache = false) match { - case None => - val json = Json.parse(create) - val tryRes = for { - labelArgs <- parser.toLabelElements(json) - label <- (management.createLabel _).tupled(labelArgs) - } yield label - - tryRes.get - case Some(label) => - println(s">> Label already exist: $create, $label") - } - } - - val vertexPropsKeys = List("age" -> "int") - - vertexPropsKeys.map { case (key, keyType) => - Management.addVertexProp(testServiceName, testColumnName, key, keyType) - } - - println("[init end]: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") - } - - - /** - * Test Helpers - */ - object TestUtil { - implicit def ec = scala.concurrent.ExecutionContext.global - - // def checkEdgeQueryJson(params: Seq[(String, String, String, String)]) = { - // val arr = for { - // (label, dir, from, to) <- params - // } yield { - // Json.obj("label" -> label, "direction" -> dir, "from" -> from, "to" -> to) - // } - // - // val s = Json.toJson(arr) - // s - // } - - def deleteAllSync(jsValue: JsValue) = { - val future = Future.sequence(jsValue.as[Seq[JsValue]] map { json => - val (labels, direction, ids, ts, vertices) = parser.toDeleteParam(json) - val future = graph.deleteAllAdjacentEdges(vertices.toList, labels, GraphUtil.directions(direction), ts) - - future - }) - - Await.result(future, HttpRequestWaitingTime) - } - - def getEdgesSync(queryJson: JsValue): JsValue = { - logger.info(Json.prettyPrint(queryJson)) - val restHandler = new RestHandler(graph) - Await.result(restHandler.getEdgesAsync(queryJson)(PostProcess.toSimpleVertexArrJson), HttpRequestWaitingTime) - } - - def insertEdgesSync(bulkEdges: String*) = { - val req = graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait = true) - val jsResult = Await.result(req, HttpRequestWaitingTime) - - jsResult - } - - def insertEdgesAsync(bulkEdges: String*) = { - val req = graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait = true) - req - } - - def toEdge(elems: Any*): String = elems.mkString("\t") - - // common tables - val testServiceName = "s2graph" - val testLabelName = "s2graph_label_test" - val testLabelName2 = "s2graph_label_test_2" - val testLabelNameV1 = "s2graph_label_test_v1" - val testLabelNameWeak = "s2graph_label_test_weak" - val testColumnName = "user_id_test" - val testColumnType = "long" - val testTgtColumnName = "item_id_test" - val testHTableName = "test-htable" - val newHTableName = "new-htable" - val index1 = "idx_1" - val index2 = "idx_2" - - val NumOfEachTest = 30 - val HttpRequestWaitingTime = Duration("60 seconds") - - val createService = s"""{"serviceName" : "$testServiceName"}""" - - val testLabelNameCreate = - s""" - { - "label": "$testLabelName", - "srcServiceName": "$testServiceName", - "srcColumnName": "$testColumnName", - "srcColumnType": "long", - "tgtServiceName": "$testServiceName", - "tgtColumnName": "$testColumnName", - "tgtColumnType": "long", - "indices": [ - {"name": "$index1", "propNames": ["weight", "time", "is_hidden", "is_blocked"]}, - {"name": "$index2", "propNames": ["_timestamp"]} - ], - "props": [ - { - "name": "time", - "dataType": "long", - "defaultValue": 0 - }, - { - "name": "weight", - "dataType": "long", - "defaultValue": 0 - }, - { - "name": "is_hidden", - "dataType": "boolean", - "defaultValue": false - }, - { - "name": "is_blocked", - "dataType": "boolean", - "defaultValue": false - } - ], - "consistencyLevel": "strong", - "schemaVersion": "v4", - "compressionAlgorithm": "gz", - "hTableName": "$testHTableName" - }""" - - val testLabelName2Create = - s""" - { - "label": "$testLabelName2", - "srcServiceName": "$testServiceName", - "srcColumnName": "$testColumnName", - "srcColumnType": "long", - "tgtServiceName": "$testServiceName", - "tgtColumnName": "$testTgtColumnName", - "tgtColumnType": "string", - "indices": [{"name": "$index1", "propNames": ["time", "weight", "is_hidden", "is_blocked"]}], - "props": [ - { - "name": "time", - "dataType": "long", - "defaultValue": 0 - }, - { - "name": "weight", - "dataType": "long", - "defaultValue": 0 - }, - { - "name": "is_hidden", - "dataType": "boolean", - "defaultValue": false - }, - { - "name": "is_blocked", - "dataType": "boolean", - "defaultValue": false - } - ], - "consistencyLevel": "strong", - "isDirected": false, - "schemaVersion": "v3", - "compressionAlgorithm": "gz" - }""" - - val testLabelNameV1Create = - s""" - { - "label": "$testLabelNameV1", - "srcServiceName": "$testServiceName", - "srcColumnName": "$testColumnName", - "srcColumnType": "long", - "tgtServiceName": "$testServiceName", - "tgtColumnName": "${testTgtColumnName}_v1", - "tgtColumnType": "string", - "indices": [{"name": "$index1", "propNames": ["time", "weight", "is_hidden", "is_blocked"]}], - "props": [ - { - "name": "time", - "dataType": "long", - "defaultValue": 0 - }, - { - "name": "weight", - "dataType": "long", - "defaultValue": 0 - }, - { - "name": "is_hidden", - "dataType": "boolean", - "defaultValue": false - }, - { - "name": "is_blocked", - "dataType": "boolean", - "defaultValue": false - } - ], - "consistencyLevel": "strong", - "isDirected": true, - "schemaVersion": "v1", - "compressionAlgorithm": "gz" - }""" - - val testLabelNameWeakCreate = - s""" - { - "label": "$testLabelNameWeak", - "srcServiceName": "$testServiceName", - "srcColumnName": "$testColumnName", - "srcColumnType": "long", - "tgtServiceName": "$testServiceName", - "tgtColumnName": "$testTgtColumnName", - "tgtColumnType": "string", - "indices": [{"name": "$index1", "propNames": ["time", "weight", "is_hidden", "is_blocked"]}], - "props": [ - { - "name": "time", - "dataType": "long", - "defaultValue": 0 - }, - { - "name": "weight", - "dataType": "long", - "defaultValue": 0 - }, - { - "name": "is_hidden", - "dataType": "boolean", - "defaultValue": false - }, - { - "name": "is_blocked", - "dataType": "boolean", - "defaultValue": false - } - ], - "consistencyLevel": "weak", - "isDirected": true, - "compressionAlgorithm": "gz" - }""" - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala deleted file mode 100644 index 0b26608..0000000 --- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala +++ /dev/null @@ -1,909 +0,0 @@ -package com.kakao.s2graph.core.Integrate - -import com.kakao.s2graph.core.GraphExceptions.BadQueryException -import com.kakao.s2graph.core.utils.logger -import org.scalatest.BeforeAndAfterEach -import play.api.libs.json.{JsNull, JsNumber, JsValue, Json} - -import scala.util.{Success, Try} - -class QueryTest extends IntegrateCommon with BeforeAndAfterEach { - - import TestUtil._ - - val insert = "insert" - val e = "e" - val weight = "weight" - val is_hidden = "is_hidden" - - test("interval") { - def queryWithInterval(id: Int, index: String, prop: String, fromVal: Int, toVal: Int) = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "$testServiceName", - "columnName": "$testColumnName", - "id": $id - }], - "steps": [ - [ { - "label": "$testLabelName", - "index": "$index", - "interval": { - "from": [ { "$prop": $fromVal } ], - "to": [ { "$prop": $toVal } ] - } - } - ]] - } - """) - - var edges = getEdgesSync(queryWithInterval(0, index2, "_timestamp", 1000, 1001)) // test interval on timestamp index - (edges \ "size").toString should be("1") - - edges = getEdgesSync(queryWithInterval(0, index2, "_timestamp", 1000, 2000)) // test interval on timestamp index - (edges \ "size").toString should be("2") - - edges = getEdgesSync(queryWithInterval(2, index1, "weight", 10, 11)) // test interval on weight index - (edges \ "size").toString should be("1") - - edges = getEdgesSync(queryWithInterval(2, index1, "weight", 10, 20)) // test interval on weight index - (edges \ "size").toString should be("2") - } - - test("get edge with where condition") { - def queryWhere(id: Int, where: String) = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "${testServiceName}", - "columnName": "${testColumnName}", - "id": ${id} - }], - "steps": [ - [ { - "label": "${testLabelName}", - "direction": "out", - "offset": 0, - "limit": 100, - "where": "${where}" - } - ]] - }""") - - var result = getEdgesSync(queryWhere(0, "is_hidden=false and _from in (-1, 0)")) - (result \ "results").as[List[JsValue]].size should be(1) - - result = getEdgesSync(queryWhere(0, "is_hidden=true and _to in (1)")) - (result \ "results").as[List[JsValue]].size should be(1) - - result = getEdgesSync(queryWhere(0, "_from=0")) - (result \ "results").as[List[JsValue]].size should be(2) - - result = getEdgesSync(queryWhere(2, "_from=2 or weight in (-1)")) - (result \ "results").as[List[JsValue]].size should be(2) - - result = getEdgesSync(queryWhere(2, "_from=2 and weight in (10, 20)")) - (result \ "results").as[List[JsValue]].size should be(2) - } - - test("get edge exclude") { - def queryExclude(id: Int) = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "${testServiceName}", - "columnName": "${testColumnName}", - "id": ${id} - }], - "steps": [ - [ { - "label": "${testLabelName}", - "direction": "out", - "offset": 0, - "limit": 2 - }, - { - "label": "${testLabelName}", - "direction": "in", - "offset": 0, - "limit": 2, - "exclude": true - } - ]] - }""") - - val result = getEdgesSync(queryExclude(0)) - (result \ "results").as[List[JsValue]].size should be(1) - } - - test("get edge groupBy property") { - def queryGroupBy(id: Int, props: Seq[String]): JsValue = { - Json.obj( - "groupBy" -> props, - "srcVertices" -> Json.arr( - Json.obj("serviceName" -> testServiceName, "columnName" -> testColumnName, "id" -> id) - ), - "steps" -> Json.arr( - Json.obj( - "step" -> Json.arr( - Json.obj( - "label" -> testLabelName - ) - ) - ) - ) - ) - } - - val result = getEdgesSync(queryGroupBy(0, Seq("weight"))) - (result \ "size").as[Int] should be(2) - val weights = (result \ "results" \\ "groupBy").map { js => - (js \ "weight").as[Int] - } - weights should contain(30) - weights should contain(40) - - weights should not contain (10) - } - - test("edge transform") { - def queryTransform(id: Int, transforms: String) = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "${testServiceName}", - "columnName": "${testColumnName}", - "id": ${id} - }], - "steps": [ - [ { - "label": "${testLabelName}", - "direction": "out", - "offset": 0, - "transform": $transforms - } - ]] - }""") - - var result = getEdgesSync(queryTransform(0, "[[\"_to\"]]")) - (result \ "results").as[List[JsValue]].size should be(2) - - result = getEdgesSync(queryTransform(0, "[[\"weight\"]]")) - (result \ "results" \\ "to").map(_.toString).sorted should be((result \ "results" \\ "weight").map(_.toString).sorted) - - result = getEdgesSync(queryTransform(0, "[[\"_from\"]]")) - (result \ "results" \\ "to").map(_.toString).sorted should be((result \ "results" \\ "from").map(_.toString).sorted) - } - - test("index") { - def queryIndex(ids: Seq[Int], indexName: String) = { - val $from = Json.arr( - Json.obj("serviceName" -> testServiceName, - "columnName" -> testColumnName, - "ids" -> ids)) - - val $step = Json.arr(Json.obj("label" -> testLabelName, "index" -> indexName)) - val $steps = Json.arr(Json.obj("step" -> $step)) - - val js = Json.obj("withScore" -> false, "srcVertices" -> $from, "steps" -> $steps) - js - } - - // weight order - var result = getEdgesSync(queryIndex(Seq(0), "idx_1")) - ((result \ "results").as[List[JsValue]].head \\ "weight").head should be(JsNumber(40)) - - // timestamp order - result = getEdgesSync(queryIndex(Seq(0), "idx_2")) - ((result \ "results").as[List[JsValue]].head \\ "weight").head should be(JsNumber(30)) - } - - // "checkEdges" in { - // running(FakeApplication()) { - // val json = Json.parse( s""" - // [{"from": 0, "to": 1, "label": "$testLabelName"}, - // {"from": 0, "to": 2, "label": "$testLabelName"}] - // """) - // - // def checkEdges(queryJson: JsValue): JsValue = { - // val ret = route(FakeRequest(POST, "/graphs/checkEdges").withJsonBody(queryJson)).get - // contentAsJson(ret) - // } - // - // val res = checkEdges(json) - // val typeRes = res.isInstanceOf[JsArray] - // typeRes must equalTo(true) - // - // val fst = res.as[Seq[JsValue]].head \ "to" - // fst.as[Int] must equalTo(1) - // - // val snd = res.as[Seq[JsValue]].last \ "to" - // snd.as[Int] must equalTo(2) - // } - // } - - - - test("duration") { - def queryDuration(ids: Seq[Int], from: Int, to: Int) = { - val $from = Json.arr( - Json.obj("serviceName" -> testServiceName, - "columnName" -> testColumnName, - "ids" -> ids)) - - val $step = Json.arr(Json.obj( - "label" -> testLabelName, "direction" -> "out", "offset" -> 0, "limit" -> 100, - "duration" -> Json.obj("from" -> from, "to" -> to))) - - val $steps = Json.arr(Json.obj("step" -> $step)) - - Json.obj("srcVertices" -> $from, "steps" -> $steps) - } - - // get all - var result = getEdgesSync(queryDuration(Seq(0, 2), from = 0, to = 5000)) - (result \ "results").as[List[JsValue]].size should be(4) - // inclusive, exclusive - result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 4000)) - (result \ "results").as[List[JsValue]].size should be(3) - - result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 2000)) - (result \ "results").as[List[JsValue]].size should be(1) - - val bulkEdges = Seq( - toEdge(1001, insert, e, 0, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)), - toEdge(2002, insert, e, 0, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)), - toEdge(3003, insert, e, 2, 0, testLabelName, Json.obj(weight -> 30)), - toEdge(4004, insert, e, 2, 1, testLabelName, Json.obj(weight -> 40)) - ) - insertEdgesSync(bulkEdges: _*) - - // duration test after udpate - // get all - result = getEdgesSync(queryDuration(Seq(0, 2), from = 0, to = 5000)) - (result \ "results").as[List[JsValue]].size should be(4) - - // inclusive, exclusive - result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 4000)) - (result \ "results").as[List[JsValue]].size should be(3) - - result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 2000)) - (result \ "results").as[List[JsValue]].size should be(1) - - } - - - test("return tree") { - def queryParents(id: Long) = Json.parse( - s""" - { - "returnTree": true, - "srcVertices": [ - { "serviceName": "$testServiceName", - "columnName": "$testColumnName", - "id": $id - }], - "steps": [ - [ { - "label": "$testLabelName", - "direction": "out", - "offset": 0, - "limit": 2 - } - ],[{ - "label": "$testLabelName", - "direction": "in", - "offset": 0, - "limit": -1 - } - ]] - }""".stripMargin) - - val src = 100 - val tgt = 200 - - insertEdgesSync(toEdge(1001, "insert", "e", src, tgt, testLabelName)) - - val result = TestUtil.getEdgesSync(queryParents(src)) - val parents = (result \ "results").as[Seq[JsValue]] - val ret = parents.forall { - edge => (edge \ "parents").as[Seq[JsValue]].size == 1 - } - - ret should be(true) - } - - - -// test("pagination and _to") { -// def querySingleWithTo(id: Int, offset: Int = 0, limit: Int = 100, to: Int) = Json.parse( -// s""" -// { "srcVertices": [ -// { "serviceName": "${testServiceName}", -// "columnName": "${testColumnName}", -// "id": ${id} -// }], -// "steps": [ -// [ { -// "label": "${testLabelName}", -// "direction": "out", -// "offset": $offset, -// "limit": $limit, -// "_to": $to -// } -// ]] -// } -// """) -// -// val src = System.currentTimeMillis().toInt -// -// val bulkEdges = Seq( -// toEdge(1001, insert, e, src, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)), -// toEdge(2002, insert, e, src, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)), -// toEdge(3003, insert, e, src, 3, testLabelName, Json.obj(weight -> 30)), -// toEdge(4004, insert, e, src, 4, testLabelName, Json.obj(weight -> 40)) -// ) -// insertEdgesSync(bulkEdges: _*) -// -// var result = getEdgesSync(querySingle(src, offset = 0, limit = 2)) -// var edges = (result \ "results").as[List[JsValue]] -// -// edges.size should be(2) -// (edges(0) \ "to").as[Long] should be(4) -// (edges(1) \ "to").as[Long] should be(3) -// -// result = getEdgesSync(querySingle(src, offset = 1, limit = 2)) -// -// edges = (result \ "results").as[List[JsValue]] -// edges.size should be(2) -// (edges(0) \ "to").as[Long] should be(3) -// (edges(1) \ "to").as[Long] should be(2) -// -// result = getEdgesSync(querySingleWithTo(src, offset = 0, limit = -1, to = 1)) -// edges = (result \ "results").as[List[JsValue]] -// edges.size should be(1) -// } - - test("order by") { - def queryScore(id: Int, scoring: Map[String, Int]): JsValue = Json.obj( - "srcVertices" -> Json.arr( - Json.obj( - "serviceName" -> testServiceName, - "columnName" -> testColumnName, - "id" -> id - ) - ), - "steps" -> Json.arr( - Json.obj( - "step" -> Json.arr( - Json.obj( - "label" -> testLabelName, - "scoring" -> scoring - ) - ) - ) - ) - ) - def queryOrderBy(id: Int, scoring: Map[String, Int], props: Seq[Map[String, String]]): JsValue = Json.obj( - "orderBy" -> props, - "srcVertices" -> Json.arr( - Json.obj("serviceName" -> testServiceName, "columnName" -> testColumnName, "id" -> id) - ), - "steps" -> Json.arr( - Json.obj( - "step" -> Json.arr( - Json.obj( - "label" -> testLabelName, - "scoring" -> scoring - ) - ) - ) - ) - ) - - val bulkEdges = Seq( - toEdge(1001, insert, e, 0, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)), - toEdge(2002, insert, e, 0, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)), - toEdge(3003, insert, e, 2, 0, testLabelName, Json.obj(weight -> 30)), - toEdge(4004, insert, e, 2, 1, testLabelName, Json.obj(weight -> 40)) - ) - - insertEdgesSync(bulkEdges: _*) - - // get edges - val edges = getEdgesSync(queryScore(0, Map("weight" -> 1))) - val orderByScore = getEdgesSync(queryOrderBy(0, Map("weight" -> 1), Seq(Map("score" -> "DESC", "timestamp" -> "DESC")))) - val ascOrderByScore = getEdgesSync(queryOrderBy(0, Map("weight" -> 1), Seq(Map("score" -> "ASC", "timestamp" -> "DESC")))) - - val edgesTo = edges \ "results" \\ "to" - val orderByTo = orderByScore \ "results" \\ "to" - val ascOrderByTo = ascOrderByScore \ "results" \\ "to" - - edgesTo should be(Seq(JsNumber(2), JsNumber(1))) - edgesTo should be(orderByTo) - ascOrderByTo should be(Seq(JsNumber(1), JsNumber(2))) - edgesTo.reverse should be(ascOrderByTo) - } - - - test("query with sampling") { - def queryWithSampling(id: Int, sample: Int) = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "$testServiceName", - "columnName": "$testColumnName", - "id": $id - }], - "steps": [ - { - "step": [{ - "label": "$testLabelName", - "direction": "out", - "offset": 0, - "limit": 100, - "sample": $sample - }] - } - ] - }""") - - def twoStepQueryWithSampling(id: Int, sample: Int) = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "$testServiceName", - "columnName": "$testColumnName", - "id": $id - }], - "steps": [ - { - "step": [{ - "label": "$testLabelName", - "direction": "out", - "offset": 0, - "limit": 100, - "sample": $sample - }] - }, - { - "step": [{ - "label": "$testLabelName", - "direction": "out", - "offset": 0, - "limit": 100, - "sample": $sample - }] - } - ] - }""") - - def twoQueryWithSampling(id: Int, sample: Int) = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "$testServiceName", - "columnName": "$testColumnName", - "id": $id - }], - "steps": [ - { - "step": [{ - "label": "$testLabelName", - "direction": "out", - "offset": 0, - "limit": 50, - "sample": $sample - }, - { - "label": "$testLabelName2", - "direction": "out", - "offset": 0, - "limit": 50 - }] - } - ] - }""") - - val sampleSize = 2 - val ts = "1442985659166" - val testId = 22 - - val bulkEdges = Seq( - toEdge(ts, insert, e, testId, 122, testLabelName), - toEdge(ts, insert, e, testId, 222, testLabelName), - toEdge(ts, insert, e, testId, 322, testLabelName), - - toEdge(ts, insert, e, testId, 922, testLabelName2), - toEdge(ts, insert, e, testId, 222, testLabelName2), - toEdge(ts, insert, e, testId, 322, testLabelName2), - - toEdge(ts, insert, e, 122, 1122, testLabelName), - toEdge(ts, insert, e, 122, 1222, testLabelName), - toEdge(ts, insert, e, 122, 1322, testLabelName), - toEdge(ts, insert, e, 222, 2122, testLabelName), - toEdge(ts, insert, e, 222, 2222, testLabelName), - toEdge(ts, insert, e, 222, 2322, testLabelName), - toEdge(ts, insert, e, 322, 3122, testLabelName), - toEdge(ts, insert, e, 322, 3222, testLabelName), - toEdge(ts, insert, e, 322, 3322, testLabelName) - ) - - insertEdgesSync(bulkEdges: _*) - - val result1 = getEdgesSync(queryWithSampling(testId, sampleSize)) - (result1 \ "results").as[List[JsValue]].size should be(math.min(sampleSize, bulkEdges.size)) - - val result2 = getEdgesSync(twoStepQueryWithSampling(testId, sampleSize)) - (result2 \ "results").as[List[JsValue]].size should be(math.min(sampleSize * sampleSize, bulkEdges.size * bulkEdges.size)) - - val result3 = getEdgesSync(twoQueryWithSampling(testId, sampleSize)) - (result3 \ "results").as[List[JsValue]].size should be(sampleSize + 3) // edges in testLabelName2 = 3 - } - test("test query with filterOut query") { - def queryWithFilterOut(id1: String, id2: String) = Json.parse( - s"""{ - | "limit": 10, - | "filterOut": { - | "srcVertices": [{ - | "serviceName": "$testServiceName", - | "columnName": "$testColumnName", - | "id": $id1 - | }], - | "steps": [{ - | "step": [{ - | "label": "$testLabelName", - | "direction": "out", - | "offset": 0, - | "limit": 10 - | }] - | }] - | }, - | "srcVertices": [{ - | "serviceName": "$testServiceName", - | "columnName": "$testColumnName", - | "id": $id2 - | }], - | "steps": [{ - | "step": [{ - | "label": "$testLabelName", - | "direction": "out", - | "offset": 0, - | "limit": 5 - | }] - | }] - |} - """.stripMargin - ) - - val testId1 = "-23" - val testId2 = "-25" - - val bulkEdges = Seq( - toEdge(1, insert, e, testId1, 111, testLabelName, Json.obj(weight -> 10)), - toEdge(2, insert, e, testId1, 222, testLabelName, Json.obj(weight -> 10)), - toEdge(3, insert, e, testId1, 333, testLabelName, Json.obj(weight -> 10)), - toEdge(4, insert, e, testId2, 111, testLabelName, Json.obj(weight -> 1)), - toEdge(5, insert, e, testId2, 333, testLabelName, Json.obj(weight -> 1)), - toEdge(6, insert, e, testId2, 555, testLabelName, Json.obj(weight -> 1)) - ) - logger.debug(s"${bulkEdges.mkString("\n")}") - insertEdgesSync(bulkEdges: _*) - - val rs = getEdgesSync(queryWithFilterOut(testId1, testId2)) - logger.debug(Json.prettyPrint(rs)) - val results = (rs \ "results").as[List[JsValue]] - results.size should be(1) - (results(0) \ "to").toString should be("555") - } - - - /** note that this merge two different label result into one */ - test("weighted union") { - def queryWithWeightedUnion(id1: String, id2: String) = Json.parse( - s""" - |{ - | "limit": 10, - | "weights": [ - | 10, - | 1 - | ], - | "groupBy": ["weight"], - | "queries": [ - | { - | "srcVertices": [ - | { - | "serviceName": "$testServiceName", - | "columnName": "$testColumnName", - | "id": $id1 - | } - | ], - | "steps": [ - | { - | "step": [ - | { - | "label": "$testLabelName", - | "direction": "out", - | "offset": 0, - | "limit": 5 - | } - | ] - | } - | ] - | }, - | { - | "srcVertices": [ - | { - | "serviceName": "$testServiceName", - | "columnName": "$testColumnName", - | "id": $id2 - | } - | ], - | "steps": [ - | { - | "step": [ - | { - | "label": "$testLabelName2", - | "direction": "out", - | "offset": 0, - | "limit": 5 - | } - | ] - | } - | ] - | } - | ] - |} - """.stripMargin - ) - - val testId1 = "1" - val testId2 = "2" - - val bulkEdges = Seq( - toEdge(1, insert, e, testId1, 111, testLabelName, Json.obj(weight -> 10)), - toEdge(2, insert, e, testId1, 222, testLabelName, Json.obj(weight -> 10)), - toEdge(3, insert, e, testId1, 333, testLabelName, Json.obj(weight -> 10)), - toEdge(4, insert, e, testId2, 444, testLabelName2, Json.obj(weight -> 1)), - toEdge(5, insert, e, testId2, 555, testLabelName2, Json.obj(weight -> 1)), - toEdge(6, insert, e, testId2, 666, testLabelName2, Json.obj(weight -> 1)) - ) - - insertEdgesSync(bulkEdges: _*) - - val rs = getEdgesSync(queryWithWeightedUnion(testId1, testId2)) - logger.debug(Json.prettyPrint(rs)) - val results = (rs \ "results").as[List[JsValue]] - results.size should be(2) - (results(0) \ "scoreSum").as[Float] should be(30.0) - (results(0) \ "agg").as[List[JsValue]].size should be(3) - (results(1) \ "scoreSum").as[Float] should be(3.0) - (results(1) \ "agg").as[List[JsValue]].size should be(3) - } - - test("weighted union with options") { - def queryWithWeightedUnionWithOptions(id1: String, id2: String) = Json.parse( - s""" - |{ - | "limit": 10, - | "weights": [ - | 10, - | 1 - | ], - | "groupBy": ["to"], - | "select": ["to", "weight"], - | "filterOut": { - | "srcVertices": [ - | { - | "serviceName": "$testServiceName", - | "columnName": "$testColumnName", - | "id": $id1 - | } - | ], - | "steps": [ - | { - | "step": [ - | { - | "label": "$testLabelName", - | "direction": "out", - | "offset": 0, - | "limit": 10 - | } - | ] - | } - | ] - | }, - | "queries": [ - | { - | "srcVertices": [ - | { - | "serviceName": "$testServiceName", - | "columnName": "$testColumnName", - | "id": $id1 - | } - | ], - | "steps": [ - | { - | "step": [ - | { - | "label": "$testLabelName", - | "direction": "out", - | "offset": 0, - | "limit": 5 - | } - | ] - | } - | ] - | }, - | { - | "srcVertices": [ - | { - | "serviceName": "$testServiceName", - | "columnName": "$testColumnName", - | "id": $id2 - | } - | ], - | "steps": [ - | { - | "step": [ - | { - | "label": "$testLabelName2", - | "direction": "out", - | "offset": 0, - | "limit": 5 - | } - | ] - | } - | ] - | } - | ] - |} - """.stripMargin - ) - - val testId1 = "-192848" - val testId2 = "-193849" - - val bulkEdges = Seq( - toEdge(1, insert, e, testId1, 111, testLabelName, Json.obj(weight -> 10)), - toEdge(2, insert, e, testId1, 222, testLabelName, Json.obj(weight -> 10)), - toEdge(3, insert, e, testId1, 333, testLabelName, Json.obj(weight -> 10)), - toEdge(4, insert, e, testId2, 111, testLabelName2, Json.obj(weight -> 1)), - toEdge(5, insert, e, testId2, 333, testLabelName2, Json.obj(weight -> 1)), - toEdge(6, insert, e, testId2, 555, testLabelName2, Json.obj(weight -> 1)) - ) - - insertEdgesSync(bulkEdges: _*) - - val rs = getEdgesSync(queryWithWeightedUnionWithOptions(testId1, testId2)) - logger.debug(Json.prettyPrint(rs)) - val results = (rs \ "results").as[List[JsValue]] - results.size should be(1) - - } - - test("scoreThreshold") { - def queryWithScoreThreshold(id: String, scoreThreshold: Int) = Json.parse( - s"""{ - | "limit": 10, - | "scoreThreshold": $scoreThreshold, - | "groupBy": ["to"], - | "srcVertices": [ - | { - | "serviceName": "$testServiceName", - | "columnName": "$testColumnName", - | "id": $id - | } - | ], - | "steps": [ - | { - | "step": [ - | { - | "label": "$testLabelName", - | "direction": "out", - | "offset": 0, - | "limit": 10 - | } - | ] - | }, - | { - | "step": [ - | { - | "label": "$testLabelName", - | "direction": "out", - | "offset": 0, - | "limit": 10 - | } - | ] - | } - | ] - |} - """.stripMargin - ) - - val testId = "-23903" - - val bulkEdges = Seq( - toEdge(1, insert, e, testId, 101, testLabelName, Json.obj(weight -> 10)), - toEdge(1, insert, e, testId, 102, testLabelName, Json.obj(weight -> 10)), - toEdge(1, insert, e, testId, 103, testLabelName, Json.obj(weight -> 10)), - toEdge(1, insert, e, 101, 102, testLabelName, Json.obj(weight -> 10)), - toEdge(1, insert, e, 101, 103, testLabelName, Json.obj(weight -> 10)), - toEdge(1, insert, e, 101, 104, testLabelName, Json.obj(weight -> 10)), - toEdge(1, insert, e, 102, 103, testLabelName, Json.obj(weight -> 10)), - toEdge(1, insert, e, 102, 104, testLabelName, Json.obj(weight -> 10)), - toEdge(1, insert, e, 103, 105, testLabelName, Json.obj(weight -> 10)) - ) - // expected: 104 -> 2, 103 -> 2, 102 -> 1,, 105 -> 1 - insertEdgesSync(bulkEdges: _*) - - var rs = getEdgesSync(queryWithScoreThreshold(testId, 2)) - logger.debug(Json.prettyPrint(rs)) - var results = (rs \ "results").as[List[JsValue]] - results.size should be(2) - - rs = getEdgesSync(queryWithScoreThreshold(testId, 1)) - logger.debug(Json.prettyPrint(rs)) - - results = (rs \ "results").as[List[JsValue]] - results.size should be(4) - } - - def querySingle(id: Int, offset: Int = 0, limit: Int = 100) = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "$testServiceName", - "columnName": "$testColumnName", - "id": $id - }], - "steps": [ - [ { - "label": "$testLabelName", - "direction": "out", - "offset": $offset, - "limit": $limit - } - ]] - } - """) - - def queryGlobalLimit(id: Int, limit: Int): JsValue = Json.obj( - "limit" -> limit, - "srcVertices" -> Json.arr( - Json.obj("serviceName" -> testServiceName, "columnName" -> testColumnName, "id" -> id) - ), - "steps" -> Json.arr( - Json.obj( - "step" -> Json.arr( - Json.obj( - "label" -> testLabelName - ) - ) - ) - ) - ) - - - // called by each test, each - override def beforeEach = initTestData() - - // called by start test, once - override def initTestData(): Unit = { - super.initTestData() - - insertEdgesSync( - toEdge(1000, insert, e, 0, 1, testLabelName, Json.obj(weight -> 40, is_hidden -> true)), - toEdge(2000, insert, e, 0, 2, testLabelName, Json.obj(weight -> 30, is_hidden -> false)), - toEdge(3000, insert, e, 2, 0, testLabelName, Json.obj(weight -> 20)), - toEdge(4000, insert, e, 2, 1, testLabelName, Json.obj(weight -> 10)), - toEdge(3000, insert, e, 10, 20, testLabelName, Json.obj(weight -> 20)), - toEdge(4000, insert, e, 20, 20, testLabelName, Json.obj(weight -> 10)), - toEdge(1, insert, e, -1, 1000, testLabelName), - toEdge(1, insert, e, -1, 2000, testLabelName), - toEdge(1, insert, e, -1, 3000, testLabelName), - toEdge(1, insert, e, 1000, 10000, testLabelName), - toEdge(1, insert, e, 1000, 11000, testLabelName), - toEdge(1, insert, e, 2000, 11000, testLabelName), - toEdge(1, insert, e, 2000, 12000, testLabelName), - toEdge(1, insert, e, 3000, 12000, testLabelName), - toEdge(1, insert, e, 3000, 13000, testLabelName), - toEdge(1, insert, e, 10000, 100000, testLabelName), - toEdge(2, insert, e, 11000, 200000, testLabelName), - toEdge(3, insert, e, 12000, 300000, testLabelName) - ) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala deleted file mode 100644 index f4da49d..0000000 --- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala +++ /dev/null @@ -1,283 +0,0 @@ -package com.kakao.s2graph.core.Integrate - -import java.util.concurrent.TimeUnit - -import play.api.libs.json.{JsValue, Json} - -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} -import scala.util.Random - -class StrongLabelDeleteTest extends IntegrateCommon { - - import StrongDeleteUtil._ - import TestUtil._ - - test("Strong consistency select") { - insertEdgesSync(bulkEdges(): _*) - - var result = getEdgesSync(query(0)) - (result \ "results").as[List[JsValue]].size should be(2) - result = getEdgesSync(query(10)) - (result \ "results").as[List[JsValue]].size should be(2) - } - - test("Strong consistency deleteAll") { - val deletedAt = 100 - var result = getEdgesSync(query(20, direction = "in", columnName = testTgtColumnName)) - - println(result) - (result \ "results").as[List[JsValue]].size should be(3) - - val deleteParam = Json.arr( - Json.obj("label" -> testLabelName2, - "direction" -> "in", - "ids" -> Json.arr("20"), - "timestamp" -> deletedAt)) - - deleteAllSync(deleteParam) - - result = getEdgesSync(query(11, direction = "out")) - println(result) - (result \ "results").as[List[JsValue]].size should be(0) - - result = getEdgesSync(query(12, direction = "out")) - println(result) - (result \ "results").as[List[JsValue]].size should be(0) - - result = getEdgesSync(query(10, direction = "out")) - println(result) - // 10 -> out -> 20 should not be in result. - (result \ "results").as[List[JsValue]].size should be(1) - (result \\ "to").size should be(1) - (result \\ "to").head.as[String] should be("21") - - result = getEdgesSync(query(20, direction = "in", columnName = testTgtColumnName)) - println(result) - (result \ "results").as[List[JsValue]].size should be(0) - - insertEdgesSync(bulkEdges(startTs = deletedAt + 1): _*) - - result = getEdgesSync(query(20, direction = "in", columnName = testTgtColumnName)) - println(result) - - (result \ "results").as[List[JsValue]].size should be(3) - } - - - test("update delete") { - val ret = for { - i <- 0 until testNum - } yield { - val src = (i + 1) * 10000 -// val src = System.currentTimeMillis() - - val (ret, last) = testInner(i, src) - ret should be(true) - ret - } - - ret.forall(identity) - } - - test("update delete 2") { - val src = System.currentTimeMillis() - var ts = 0L - - val ret = for { - i <- 0 until testNum - } yield { - val (ret, lastTs) = testInner(ts, src) - val deletedAt = lastTs + 1 - val deletedAt2 = lastTs + 2 - ts = deletedAt2 + 1 // nex start ts - - ret should be(true) - - val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt)) - val deleteAllRequest2 = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt2)) - - val deleteRet = deleteAllSync(deleteAllRequest) - val deleteRet2 = deleteAllSync(deleteAllRequest2) - - val result = getEdgesSync(query(id = src)) - println(result) - - val resultEdges = (result \ "results").as[Seq[JsValue]] - resultEdges.isEmpty should be(true) - - val degreeAfterDeleteAll = getDegree(result) - - degreeAfterDeleteAll should be(0) - degreeAfterDeleteAll === (0) - } - - ret.forall(identity) - } - - /** This test stress out test on degree - * when contention is low but number of adjacent edges are large - * Large set of contention test - */ - test("large degrees") { - val labelName = testLabelName2 - val dir = "out" - val maxSize = 100 - val deleteSize = 10 - val numOfConcurrentBatch = 100 - val src = System.currentTimeMillis() - val tgts = (0 until maxSize).map { ith => src + ith } - val deleteTgts = Random.shuffle(tgts).take(deleteSize) - val insertRequests = tgts.map { tgt => - Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t") - } - val deleteRequests = deleteTgts.take(deleteSize).map { tgt => - Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", dir).mkString("\t") - } - val allRequests = Random.shuffle(insertRequests ++ deleteRequests) - // val allRequests = insertRequests ++ deleteRequests - val futures = allRequests.grouped(numOfConcurrentBatch).map { bulkRequests => - insertEdgesAsync(bulkRequests: _*) - } - - Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) - - val expectedDegree = insertRequests.size - deleteRequests.size - val queryJson = query(id = src) - val result = getEdgesSync(queryJson) - val resultSize = (result \ "size").as[Long] - val resultDegree = getDegree(result) - - // println(result) - - val ret = resultSize == expectedDegree && resultDegree == resultSize - println(s"[MaxSize]: $maxSize") - println(s"[DeleteSize]: $deleteSize") - println(s"[ResultDegree]: $resultDegree") - println(s"[ExpectedDegree]: $expectedDegree") - println(s"[ResultSize]: $resultSize") - ret should be(true) - } - - test("deleteAll") { - val labelName = testLabelName2 - val dir = "out" - val maxSize = 100 - val deleteSize = 10 - val numOfConcurrentBatch = 100 - val src = System.currentTimeMillis() - val tgts = (0 until maxSize).map { ith => src + ith } - val deleteTgts = Random.shuffle(tgts).take(deleteSize) - val insertRequests = tgts.map { tgt => - Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t") - } - val deleteRequests = deleteTgts.take(deleteSize).map { tgt => - Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", dir).mkString("\t") - } - val allRequests = Random.shuffle(insertRequests ++ deleteRequests) - val futures = allRequests.grouped(numOfConcurrentBatch).map { bulkRequests => - insertEdgesAsync(bulkRequests: _*) - } - - Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) - - val deletedAt = System.currentTimeMillis() - val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt)) - - deleteAllSync(deleteAllRequest) - - val result = getEdgesSync(query(id = src)) - println(result) - val resultEdges = (result \ "results").as[Seq[JsValue]] - resultEdges.isEmpty should be(true) - - val degreeAfterDeleteAll = getDegree(result) - degreeAfterDeleteAll should be(0) - } - - object StrongDeleteUtil { - - val labelName = testLabelName2 -// val labelName = testLabelName - val maxTgtId = 10 - val batchSize = 10 - val testNum = 100 - val numOfBatch = 10 - - def testInner(startTs: Long, src: Long) = { - val lastOps = Array.fill(maxTgtId)("none") - var currentTs = startTs - - val allRequests = for { - ith <- 0 until numOfBatch - jth <- 0 until batchSize - } yield { - currentTs += 1 - - val tgt = Random.nextInt(maxTgtId) - val op = if (Random.nextDouble() < 0.5) "delete" else "update" - - lastOps(tgt) = op - Seq(currentTs, op, "e", src, tgt, labelName, "{}").mkString("\t") - } - - allRequests.foreach(println(_)) - - val futures = Random.shuffle(allRequests).grouped(batchSize).map { bulkRequests => - insertEdgesAsync(bulkRequests: _*) - } - - Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) - - val expectedDegree = lastOps.count(op => op != "delete" && op != "none") - val queryJson = query(id = src) - val result = getEdgesSync(queryJson) - val resultSize = (result \ "size").as[Long] - val resultDegree = getDegree(result) - - println(lastOps.toList) - println(result) - - val ret = resultDegree == expectedDegree && resultSize == resultDegree - if (!ret) System.err.println(s"[Contention Failed]: $resultDegree, $expectedDegree") - - (ret, currentTs) - } - - def bulkEdges(startTs: Int = 0) = Seq( - toEdge(startTs + 1, "insert", "e", "0", "1", labelName, s"""{"time": 10}"""), - toEdge(startTs + 2, "insert", "e", "0", "1", labelName, s"""{"time": 11}"""), - toEdge(startTs + 3, "insert", "e", "0", "1", labelName, s"""{"time": 12}"""), - toEdge(startTs + 4, "insert", "e", "0", "2", labelName, s"""{"time": 10}"""), - toEdge(startTs + 5, "insert", "e", "10", "20", labelName, s"""{"time": 10}"""), - toEdge(startTs + 6, "insert", "e", "10", "21", labelName, s"""{"time": 11}"""), - toEdge(startTs + 7, "insert", "e", "11", "20", labelName, s"""{"time": 12}"""), - toEdge(startTs + 8, "insert", "e", "12", "20", labelName, s"""{"time": 13}""") - ) - - def query(id: Long, serviceName: String = testServiceName, columnName: String = testColumnName, - _labelName: String = labelName, direction: String = "out") = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "$serviceName", - "columnName": "$columnName", - "id": $id - }], - "steps": [ - [ { - "label": "${_labelName}", - "direction": "${direction}", - "offset": 0, - "limit": -1, - "duplicate": "raw" - } - ]] - }""") - - def getDegree(jsValue: JsValue): Long = { - ((jsValue \ "degrees") \\ "_degree").headOption.map(_.as[Long]).getOrElse(0L) - } - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/VertexTestHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/VertexTestHelper.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/VertexTestHelper.scala deleted file mode 100644 index ffbec11..0000000 --- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/VertexTestHelper.scala +++ /dev/null @@ -1,71 +0,0 @@ -package com.kakao.s2graph.core.Integrate - -import com.kakao.s2graph.core.PostProcess -import play.api.libs.json.{JsValue, Json} - -import scala.concurrent.Await -import scala.util.Random - - -class VertexTestHelper extends IntegrateCommon { - - import TestUtil._ - import VertexTestHelper._ - - test("vertex") { - val ids = (7 until 20).map(tcNum => tcNum * 1000 + 0) - val (serviceName, columnName) = (testServiceName, testColumnName) - - val data = vertexInsertsPayload(serviceName, columnName, ids) - val payload = Json.parse(Json.toJson(data).toString) - println(payload) - - val vertices = parser.toVertices(payload, "insert", Option(serviceName), Option(columnName)) - Await.result(graph.mutateVertices(vertices, withWait = true), HttpRequestWaitingTime) - - val res = graph.getVertices(vertices).map { vertices => - PostProcess.verticesToJson(vertices) - } - - val ret = Await.result(res, HttpRequestWaitingTime) - val fetched = ret.as[Seq[JsValue]] - for { - (d, f) <- data.zip(fetched) - } yield { - (d \ "id") should be(f \ "id") - ((d \ "props") \ "age") should be((f \ "props") \ "age") - } - } - - object VertexTestHelper { - def vertexQueryJson(serviceName: String, columnName: String, ids: Seq[Int]) = { - Json.parse( - s""" - |[ - |{"serviceName": "$serviceName", "columnName": "$columnName", "ids": [${ids.mkString(",")} - ]} - |] - """.stripMargin) - } - - def vertexInsertsPayload(serviceName: String, columnName: String, ids: Seq[Int]): Seq[JsValue] = { - ids.map { id => - Json.obj("id" -> id, "props" -> randomProps, "timestamp" -> System.currentTimeMillis()) - } - } - - val vertexPropsKeys = List( - ("age", "int") - ) - - def randomProps() = { - (for { - (propKey, propType) <- vertexPropsKeys - } yield { - propKey -> Random.nextInt(100) - }).toMap - } - } -} - - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala deleted file mode 100644 index 2028c44..0000000 --- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala +++ /dev/null @@ -1,136 +0,0 @@ -package com.kakao.s2graph.core.Integrate - -import java.util.concurrent.TimeUnit - -import org.scalatest.BeforeAndAfterEach -import play.api.libs.json.{JsObject, JsValue, Json} - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach { - - import TestUtil._ - import WeakLabelDeleteHelper._ - - test("test weak consistency select") { - var result = getEdgesSync(query(0)) - println(result) - (result \ "results").as[List[JsValue]].size should be(4) - result = getEdgesSync(query(10)) - println(result) - (result \ "results").as[List[JsValue]].size should be(2) - } - - test("test weak consistency delete") { - var result = getEdgesSync(query(0)) - println(result) - - /** expect 4 edges */ - (result \ "results").as[List[JsValue]].size should be(4) - val edges = (result \ "results").as[List[JsObject]] - val edgesToStore = parser.toEdges(Json.toJson(edges), "delete") - val rets = graph.mutateEdges(edgesToStore, withWait = true) - Await.result(rets, Duration(20, TimeUnit.MINUTES)) - - /** expect noting */ - result = getEdgesSync(query(0)) - println(result) - (result \ "results").as[List[JsValue]].size should be(0) - - /** insert should be ignored */ - /** - * I am wondering if this is right test case - * This makes sense because hbase think cell is deleted when there are - * insert/delete with same timestamp(version) on same cell. - * This can be different on different storage system so I think - * this test should be removed. - */ -// val edgesToStore2 = parser.toEdges(Json.toJson(edges), "insert") -// val rets2 = graph.mutateEdges(edgesToStore2, withWait = true) -// Await.result(rets2, Duration(20, TimeUnit.MINUTES)) -// -// result = getEdgesSync(query(0)) -// (result \ "results").as[List[JsValue]].size should be(0) - } - - - test("test weak consistency deleteAll") { - val deletedAt = 100 - var result = getEdgesSync(query(20, "in", testTgtColumnName)) - println(result) - (result \ "results").as[List[JsValue]].size should be(3) - - val json = Json.arr(Json.obj("label" -> testLabelNameWeak, - "direction" -> "in", "ids" -> Json.arr("20"), "timestamp" -> deletedAt)) - println(json) - deleteAllSync(json) - - result = getEdgesSync(query(11, "out")) - (result \ "results").as[List[JsValue]].size should be(0) - - result = getEdgesSync(query(12, "out")) - (result \ "results").as[List[JsValue]].size should be(0) - - result = getEdgesSync(query(10, "out")) - - // 10 -> out -> 20 should not be in result. - (result \ "results").as[List[JsValue]].size should be(1) - (result \\ "to").size should be(1) - (result \\ "to").head.as[String] should be("21") - - result = getEdgesSync(query(20, "in", testTgtColumnName)) - println(result) - (result \ "results").as[List[JsValue]].size should be(0) - - insertEdgesSync(bulkEdges(startTs = deletedAt + 1): _*) - - result = getEdgesSync(query(20, "in", testTgtColumnName)) - (result \ "results").as[List[JsValue]].size should be(3) - } - - - // called by each test, each - override def beforeEach = initTestData() - - // called by start test, once - - override def initTestData(): Unit = { - super.initTestData() - insertEdgesSync(bulkEdges(): _*) - } - - object WeakLabelDeleteHelper { - - def bulkEdges(startTs: Int = 0) = Seq( - toEdge(startTs + 1, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 10}"""), - toEdge(startTs + 2, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 11}"""), - toEdge(startTs + 3, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 12}"""), - toEdge(startTs + 4, "insert", "e", "0", "2", testLabelNameWeak, s"""{"time": 10}"""), - toEdge(startTs + 5, "insert", "e", "10", "20", testLabelNameWeak, s"""{"time": 10}"""), - toEdge(startTs + 6, "insert", "e", "10", "21", testLabelNameWeak, s"""{"time": 11}"""), - toEdge(startTs + 7, "insert", "e", "11", "20", testLabelNameWeak, s"""{"time": 12}"""), - toEdge(startTs + 8, "insert", "e", "12", "20", testLabelNameWeak, s"""{"time": 13}""") - ) - - def query(id: Int, direction: String = "out", columnName: String = testColumnName) = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "$testServiceName", - "columnName": "$columnName", - "id": ${id} - }], - "steps": [ - [ { - "label": "${testLabelNameWeak}", - "direction": "${direction}", - "offset": 0, - "limit": 10, - "duplicate": "raw" - } - ]] - }""") - } - -} -
