http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala new file mode 100644 index 0000000..d67de7c --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala @@ -0,0 +1,535 @@ +package org.apache.s2graph.core + +import org.apache.s2graph.core.mysqls.LabelMeta +import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId} +import org.apache.s2graph.core.utils.logger +import org.scalatest.FunSuite + +class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { + initTests() + + test("toLogString") { + val testLabelName = labelNameV2 + val bulkQueries = List( + ("1445240543366", "update", "{\"is_blocked\":true}"), + ("1445240543362", "insert", "{\"is_hidden\":false}"), + ("1445240543364", "insert", "{\"is_hidden\":false,\"weight\":10}"), + ("1445240543363", "delete", "{}"), + ("1445240543365", "update", "{\"time\":1, \"weight\":-10}")) + + val (srcId, tgtId, labelName) = ("1", "2", testLabelName) + + val bulkEdge = (for ((ts, op, props) <- bulkQueries) yield { + Management.toEdge(ts.toLong, op, srcId, tgtId, labelName, "out", props).toLogString + }).mkString("\n") + + val expected = Seq( + Seq("1445240543366", "update", "e", "1", "2", testLabelName, "{\"is_blocked\":true}"), + Seq("1445240543362", "insert", "e", "1", "2", testLabelName, "{\"is_hidden\":false}"), + Seq("1445240543364", "insert", "e", "1", "2", testLabelName, "{\"is_hidden\":false,\"weight\":10}"), + Seq("1445240543363", "delete", "e", "1", "2", testLabelName), + Seq("1445240543365", "update", "e", "1", "2", testLabelName, "{\"time\":1,\"weight\":-10}") + ).map(_.mkString("\t")).mkString("\n") + + assert(bulkEdge === expected) + } + + test("buildOperation") { + val schemaVersion = "v2" + val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion)) + val srcVertex = Vertex(vertexId) + val tgtVertex = srcVertex + + val timestampProp = LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) + + val snapshotEdge = None + val propsWithTs = Map(timestampProp) + val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = propsWithTs) + val newVersion = 0L + + val newPropsWithTs = Map( + timestampProp, + 1.toByte -> InnerValLikeWithTs(InnerVal.withBoolean(false, schemaVersion), 1) + ) + + val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, propsWithTs, newPropsWithTs) + logger.info(edgeMutate.toLogString) + + assert(edgeMutate.newSnapshotEdge.isDefined) + assert(edgeMutate.edgesToInsert.nonEmpty) + assert(edgeMutate.edgesToDelete.isEmpty) + } + + test("buildMutation: snapshotEdge: None with newProps") { + val schemaVersion = "v2" + val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion)) + val srcVertex = Vertex(vertexId) + val tgtVertex = srcVertex + + val timestampProp = LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) + + val snapshotEdge = None + val propsWithTs = Map(timestampProp) + val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = propsWithTs) + val newVersion = 0L + + val newPropsWithTs = Map( + timestampProp, + 1.toByte -> InnerValLikeWithTs(InnerVal.withBoolean(false, schemaVersion), 1) + ) + + val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, propsWithTs, newPropsWithTs) + logger.info(edgeMutate.toLogString) + + assert(edgeMutate.newSnapshotEdge.isDefined) + assert(edgeMutate.edgesToInsert.nonEmpty) + assert(edgeMutate.edgesToDelete.isEmpty) + } + + test("buildMutation: oldPropsWithTs == newPropsWithTs, Drop all requests") { + val schemaVersion = "v2" + val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion)) + val srcVertex = Vertex(vertexId) + val tgtVertex = srcVertex + + val timestampProp = LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) + + val snapshotEdge = None + val propsWithTs = Map(timestampProp) + val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = propsWithTs) + val newVersion = 0L + + val newPropsWithTs = propsWithTs + + val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, propsWithTs, newPropsWithTs) + logger.info(edgeMutate.toLogString) + + assert(edgeMutate.newSnapshotEdge.isEmpty) + assert(edgeMutate.edgesToInsert.isEmpty) + assert(edgeMutate.edgesToDelete.isEmpty) + } + + test("buildMutation: All props older than snapshotEdge's LastDeletedAt") { + val schemaVersion = "v2" + val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion)) + val srcVertex = Vertex(vertexId) + val tgtVertex = srcVertex + + val timestampProp = LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) + val oldPropsWithTs = Map( + timestampProp, + LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) + ) + + val propsWithTs = Map( + timestampProp, + 3.toByte -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 2), + LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) + ) + + val snapshotEdge = + Option(Edge(srcVertex, tgtVertex, labelWithDirV2, op = GraphUtil.operations("delete"), propsWithTs = oldPropsWithTs)) + + val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = propsWithTs) + val newVersion = 0L + val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) + logger.info(edgeMutate.toLogString) + + assert(edgeMutate.newSnapshotEdge.nonEmpty) + assert(edgeMutate.edgesToInsert.isEmpty) + assert(edgeMutate.edgesToDelete.isEmpty) + } + + test("buildMutation: All props newer than snapshotEdge's LastDeletedAt") { + val schemaVersion = "v2" + val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion)) + val srcVertex = Vertex(vertexId) + val tgtVertex = srcVertex + + val timestampProp = LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) + val oldPropsWithTs = Map( + timestampProp, + LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) + ) + + val propsWithTs = Map( + timestampProp, + 3.toByte -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 4), + LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) + ) + + val snapshotEdge = + Option(Edge(srcVertex, tgtVertex, labelWithDirV2, op = GraphUtil.operations("delete"), propsWithTs = oldPropsWithTs)) + + val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = propsWithTs) + val newVersion = 0L + val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) + logger.info(edgeMutate.toLogString) + + assert(edgeMutate.newSnapshotEdge.nonEmpty) + assert(edgeMutate.edgesToInsert.nonEmpty) + assert(edgeMutate.edgesToDelete.isEmpty) + } +} + +//import com.kakao.s2graph.core.types._ +//import org.hbase.async.PutRequest +//import org.scalatest.{FunSuite, Matchers} +// +///** +// * Created by shon on 5/29/15. +// */ +//class EdgeTest extends FunSuite with Matchers with TestCommon with TestCommonWithModels { +// +// +// import HBaseType.{VERSION1, VERSION2} +// +// +// def srcVertex(innerVal: InnerValLike)(version: String) = { +// val colId = if (version == VERSION1) column.id.get else columnV2.id.get +// Vertex(SourceVertexId(colId, innerVal), ts) +// } +// +// def tgtVertex(innerVal: InnerValLike)(version: String) = { +// val colId = if (version == VERSION1) column.id.get else columnV2.id.get +// Vertex(TargetVertexId(colId, innerVal), ts) +// } +// +// +// def testEdges(version: String) = { +// val innerVals = if (version == VERSION1) intInnerVals else intInnerValsV2 +// val tgtV = tgtVertex(innerVals.head)(version) +// innerVals.tail.map { intInnerVal => +// val labelWithDirection = if (version == VERSION1) labelWithDir else labelWithDirV2 +// val idxPropsList = if (version == VERSION1) idxPropsLs else idxPropsLsV2 +// idxPropsList.map { idxProps => +// val currentTs = idxProps.toMap.get(0.toByte).get.toString.toLong +// val idxPropsWithTs = idxProps.map { case (k, v) => k -> InnerValLikeWithTs(v, currentTs) } +// +// Edge(srcVertex(intInnerVal)(version), tgtV, labelWithDirection, op, currentTs, currentTs, idxPropsWithTs.toMap) +// } +// } +// } +// +// def testPropsUpdate(oldProps: Map[Byte, InnerValLikeWithTs], +// newProps: Map[Byte, InnerValLikeWithTs], +// expected: Map[Byte, Any], +// expectedShouldUpdate: Boolean) +// (f: PropsPairWithTs => (Map[Byte, InnerValLikeWithTs], Boolean))(version: String) = { +// +// val timestamp = newProps.toList.head._2.ts +// val (updated, shouldUpdate) = f((oldProps, newProps, timestamp, version)) +// val rets = for { +// (k, v) <- expected +// } yield { +// v match { +// case v: String => +// v match { +// case "left" => updated.get(k).isDefined && updated(k) == oldProps(k) +// case "right" => updated.get(k).isDefined && updated(k) == newProps(k) +// case "none" => updated.get(k).isEmpty +// } +// case value: InnerValLikeWithTs => updated.get(k).get == value +// case _ => throw new RuntimeException(s"not supported keyword: $v") +// } +// } +// println(rets) +// rets.forall(x => x) && shouldUpdate == expectedShouldUpdate +// } +// +// def testEdgeWithIndex(edges: Seq[Seq[Edge]])(queryParam: QueryParam) = { +// val rets = for { +// edgeForSameTgtVertex <- edges +// } yield { +// val head = edgeForSameTgtVertex.head +// val start = head +// var prev = head +// val rets = for { +// edge <- edgeForSameTgtVertex.tail +// } yield { +// println(s"prevEdge: $prev") +// println(s"currentEdge: $edge") +// val prevEdgeWithIndex = edge.edgesWithIndex +// val edgesWithIndex = edge.edgesWithIndex +// +// /** test encodeing decoding */ +// for { +// edgeWithIndex <- edge.edgesWithIndex +// put <- edgeWithIndex.buildPutsAsync() +// kv <- putToKeyValues(put.asInstanceOf[PutRequest]) +// } { +// val decoded = Edge.toEdge(kv, queryParam, None, Seq()) +// val comp = decoded.isDefined && decoded.get == edge +// println(s"${decoded.get}") +// println(s"$edge") +// println(s"${decoded.get == edge}") +// comp shouldBe true +// +// /** test order +// * same source, target vertex. same indexProps keys. +// * only difference is indexProps values so comparing qualifier is good enough +// * */ +// for { +// prevEdgeWithIndex <- prev.edgesWithIndex +// } { +// println(edgeWithIndex.qualifier) +// println(prevEdgeWithIndex.qualifier) +// println(edgeWithIndex.qualifier.bytes.toList) +// println(prevEdgeWithIndex.qualifier.bytes.toList) +// /** since index of this test label only use 0, 1 as indexProps +// * if 0, 1 is not different then qualifier bytes should be same +// * */ +// val comp = lessThanEqual(edgeWithIndex.qualifier.bytes, prevEdgeWithIndex.qualifier.bytes) +// comp shouldBe true +// } +// } +// prev = edge +// } +// } +// } +// +// def testInvertedEdge(edges: Seq[Seq[Edge]])(queryParam: QueryParam) = { +// val rets = for { +// edgeForSameTgtVertex <- edges +// } yield { +// val head = edgeForSameTgtVertex.head +// val start = head +// var prev = head +// val rets = for { +// edge <- edgeForSameTgtVertex.tail +// } yield { +// println(s"prevEdge: $prev") +// println(s"currentEdge: $edge") +// val prevEdgeWithIndexInverted = prev.toSnapshotEdge +// val edgeWithInvertedIndex = edge.toSnapshotEdge +// /** test encode decoding */ +// +// val put = edgeWithInvertedIndex.buildPutAsync() +// for { +// kv <- putToKeyValues(put) +// } yield { +// val decoded = Edge.toSnapshotEdge(kv, queryParam, None, isInnerCall = false, Seq()) +// val comp = decoded.isDefined && decoded.get == edge +// println(s"${decoded.get}") +// println(s"$edge") +// println(s"${decoded.get == edge}") +// comp shouldBe true +// +// /** no need to test ordering because qualifier only use targetVertexId */ +// } +// prev = edge +// } +// } +// } +// +// test("insert for edgesWithIndex version 2") { +// val version = VERSION2 +// testEdgeWithIndex(testEdges(version))(queryParamV2) +// } +// test("insert for edgesWithIndex version 1") { +// val version = VERSION1 +// testEdgeWithIndex(testEdges(version))(queryParam) +// } +// +// test("insert for edgeWithInvertedIndex version 1") { +// val version = VERSION1 +// testInvertedEdge(testEdges(version))(queryParam) +// } +// +// test("insert for edgeWithInvertedIndex version 2") { +// val version = VERSION2 +// testInvertedEdge(testEdges(version))(queryParamV2) +// } +// +// +// // /** test cases for each operation */ +// +// def oldProps(timestamp: Long, version: String) = { +// Map( +// labelMeta.lastDeletedAt -> InnerValLikeWithTs.withLong(timestamp - 2, timestamp - 2, version), +// 1.toByte -> InnerValLikeWithTs.withLong(0L, timestamp, version), +// 2.toByte -> InnerValLikeWithTs.withLong(1L, timestamp - 1, version), +// 4.toByte -> InnerValLikeWithTs.withStr("old", timestamp - 1, version) +// ) +// } +// +// def newProps(timestamp: Long, version: String) = { +// Map( +// 2.toByte -> InnerValLikeWithTs.withLong(-10L, timestamp, version), +// 3.toByte -> InnerValLikeWithTs.withLong(20L, timestamp, version) +// ) +// } +// +// def deleteProps(timestamp: Long, version: String) = Map( +// labelMeta.lastDeletedAt -> InnerValLikeWithTs.withLong(timestamp, timestamp, version) +// ) +// +// /** upsert */ +// test("Edge.buildUpsert") { +// val shouldUpdate = true +// val oldState = oldProps(ts, VERSION2) +// val newState = newProps(ts + 1, VERSION2) +// val expected = Map( +// labelMeta.lastDeletedAt -> "left", +// 1.toByte -> "none", +// 2.toByte -> "right", +// 3.toByte -> "right", +// 4.toByte -> "none") +// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildUpsert)(VERSION2) shouldBe true +// } +// test("Edge.buildUpsert shouldUpdate false") { +// val shouldUpdate = false +// val oldState = oldProps(ts, VERSION2) +// val newState = newProps(ts - 10, VERSION2) +// val expected = Map( +// labelMeta.lastDeletedAt -> "left", +// 1.toByte -> "left", +// 2.toByte -> "left", +// 3.toByte -> "none", +// 4.toByte -> "left") +// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildUpsert)(VERSION2) shouldBe true +// } +// +// /** update */ +// test("Edge.buildUpdate") { +// val shouldUpdate = true +// val oldState = oldProps(ts, VERSION2) +// val newState = newProps(ts + 1, VERSION2) +// val expected = Map( +// labelMeta.lastDeletedAt -> "left", +// 1.toByte -> "left", +// 2.toByte -> "right", +// 3.toByte -> "right", +// 4.toByte -> "left" +// ) +// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildUpdate)(VERSION2) shouldBe true +// } +// test("Edge.buildUpdate shouldUpdate false") { +// val shouldUpdate = false +// val oldState = oldProps(ts, VERSION2) +// val newState = newProps(ts - 10, VERSION2) +// val expected = Map( +// labelMeta.lastDeletedAt -> "left", +// 1.toByte -> "left", +// 2.toByte -> "left", +// 3.toByte -> "none", +// 4.toByte -> "left" +// ) +// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildUpdate)(VERSION2) shouldBe true +// } +// +// /** delete */ +// test("Edge.buildDelete") { +// val shouldUpdate = true +// val oldState = oldProps(ts, VERSION2) +// val newState = deleteProps(ts + 1, VERSION2) +// val expected = Map( +// labelMeta.lastDeletedAt -> "right", +// 1.toByte -> "none", +// 2.toByte -> "none", +// 4.toByte -> "none" +// ) +// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildDelete)(VERSION2) shouldBe true +// } +// test("Edge.buildDelete shouldUpdate false") { +// val shouldUpdate = false +// val oldState = oldProps(ts, VERSION2) +// val newState = deleteProps(ts - 10, VERSION2) +// val expected = Map( +// labelMeta.lastDeletedAt -> "left", +// 1.toByte -> "left", +// 2.toByte -> "left", +// 4.toByte -> "left" +// ) +// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildDelete)(VERSION2) shouldBe true +// } +// +// /** increment */ +// test("Edge.buildIncrement") { +// val shouldUpdate = true +// val oldState = oldProps(ts, VERSION2).filterNot(kv => kv._1 == 4.toByte) +// val newState = newProps(ts + 1, VERSION2) +// val expected = Map( +// labelMeta.lastDeletedAt -> "left", +// 1.toByte -> "left", +// 2.toByte -> InnerValLikeWithTs.withLong(-9L, ts - 1, VERSION2), +// 3.toByte -> "right" +// ) +// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildIncrement)(VERSION2) shouldBe true +// } +// test("Edge.buildIncrement shouldRepalce false") { +// val shouldUpdate = false +// val oldState = oldProps(ts, VERSION2).filterNot(kv => kv._1 == 4.toByte) +// val newState = newProps(ts - 10, VERSION2) +// val expected = Map( +// labelMeta.lastDeletedAt -> "left", +// 1.toByte -> "left", +// 2.toByte -> "left" +// ) +// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildIncrement)(VERSION2) shouldBe true +// } +// +// test("Edge`s srcVertex") { +// +// val version = VERSION2 +// val srcId = InnerVal.withLong(10, version) +// val tgtId = InnerVal.withStr("abc", version) +// val srcColumn = columnV2 +// val tgtColumn = tgtColumnV2 +// val srcVertexId = VertexId(srcColumn.id.get, srcId) +// val tgtVertexId = VertexId(tgtColumn.id.get, tgtId) +// +// val srcVertex = Vertex(srcVertexId) +// val tgtVertex = Vertex(tgtVertexId) +// +// val labelId = undirectedLabelV2.id.get +// +// val outDir = LabelWithDirection(labelId, GraphUtil.directions("out")) +// val inDir = LabelWithDirection(labelId, GraphUtil.directions("in")) +// val bothDir = LabelWithDirection(labelId, GraphUtil.directions("undirected")) +// +// val op = GraphUtil.operations("insert") +// +// +// val bothEdge = Edge(srcVertex, tgtVertex, bothDir) +// println(s"edge: $bothEdge") +// bothEdge.relatedEdges.foreach { edge => +// println(edge) +// } +// +// } +// test("edge buildIncrementBulk") { +// +// /** +// * 172567371 List(97, 74, 2, 117, -74, -44, -76, 0, 0, 4, 8, 2) +//169116518 List(68, -110, 2, 117, -21, 124, -103, 0, 0, 4, 9, 2) +//11646834 List(17, 33, 2, 127, 78, 72, -115, 0, 0, 4, 9, 2) +//148171217 List(62, 54, 2, 119, 43, 22, 46, 0, 0, 4, 9, 2) +//116315188 List(41, 86, 2, 121, 17, 43, -53, 0, 0, 4, 9, 2) +//180667876 List(48, -82, 2, 117, 59, 58, 27, 0, 0, 4, 8, 2) +//4594410 List(82, 29, 2, 127, -71, -27, 21, 0, 0, 4, 8, 2) +//151435444 List(1, 105, 2, 118, -7, 71, 75, 0, 0, 4, 8, 2) +//168460895 List(67, -35, 2, 117, -11, 125, -96, 0, 0, 4, 9, 2) +//7941614 List(115, 67, 2, 127, -122, -46, 17, 0, 0, 4, 8, 2) +//171169732 List(61, -42, 2, 117, -52, 40, 59, 0, 0, 4, 9, 2) +//174381375 List(91, 2, 2, 117, -101, 38, -64, 0, 0, 4, 9, 2) +//12754019 List(9, -80, 2, 127, 61, 99, -100, 0, 0, 4, 9, 2) +//175518092 List(111, 32, 2, 117, -119, -50, 115, 0, 0, 4, 8, 2) +//174748531 List(28, -81, 2, 117, -107, -116, -116, 0, 0, 4, 8, 2) +// +// */ +// // val incrementsOpt = Edge.buildIncrementDegreeBulk("169116518", "talk_friend_long_term_agg_test", "out", 10) +// // +// // for { +// // increments <- incrementsOpt +// // increment <- increments +// // (cf, qs) <- increment.getFamilyMapOfLongs +// // (q, v) <- qs +// // } { +// // println(increment.getRow.toList) +// // println(q.toList) +// // println(v) +// // } +// //List(68, -110, -29, -4, 116, -24, 124, -37, 0, 0, 52, -44, 2) +// } +//}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala new file mode 100644 index 0000000..13f0b0a --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala @@ -0,0 +1,226 @@ +package org.apache.s2graph.core.Integrate + +import org.apache.s2graph.core.mysqls.{Label, LabelMeta} +import play.api.libs.json.{JsObject, Json} + +class CrudTest extends IntegrateCommon { + import CrudHelper._ + import TestUtil._ + + test("test CRUD") { + var tcNum = 0 + var tcString = "" + var bulkQueries = List.empty[(Long, String, String)] + var expected = Map.empty[String, String] + + val curTime = System.currentTimeMillis + val t1 = curTime + 0 + val t2 = curTime + 1 + val t3 = curTime + 2 + val t4 = curTime + 3 + val t5 = curTime + 4 + + val tcRunner = new CrudTestRunner() + tcNum = 1 + tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test " + + bulkQueries = List( + (t1, "insert", "{\"time\": 10}"), + (t2, "delete", ""), + (t3, "insert", "{\"time\": 10, \"weight\": 20}")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + + tcNum = 2 + tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test " + bulkQueries = List( + (t1, "insert", "{\"time\": 10}"), + (t3, "insert", "{\"time\": 10, \"weight\": 20}"), + (t2, "delete", "")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + + tcNum = 3 + tcString = "[t3 -> t2 -> t1 test case] insert(t3) delete(t2) insert(t1) test " + bulkQueries = List( + (t3, "insert", "{\"time\": 10, \"weight\": 20}"), + (t2, "delete", ""), + (t1, "insert", "{\"time\": 10}")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + + tcNum = 4 + tcString = "[t3 -> t1 -> t2 test case] insert(t3) insert(t1) delete(t2) test " + bulkQueries = List( + (t3, "insert", "{\"time\": 10, \"weight\": 20}"), + (t1, "insert", "{\"time\": 10}"), + (t2, "delete", "")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + + tcNum = 5 + tcString = "[t2 -> t1 -> t3 test case] delete(t2) insert(t1) insert(t3) test" + bulkQueries = List( + (t2, "delete", ""), + (t1, "insert", "{\"time\": 10}"), + (t3, "insert", "{\"time\": 10, \"weight\": 20}")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + + tcNum = 6 + tcString = "[t2 -> t3 -> t1 test case] delete(t2) insert(t3) insert(t1) test " + bulkQueries = List( + (t2, "delete", ""), + (t3, "insert", "{\"time\": 10, \"weight\": 20}"), + (t1, "insert", "{\"time\": 10}")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + + tcNum = 7 + tcString = "[t1 -> t2 -> t3 test case] update(t1) delete(t2) update(t3) test " + bulkQueries = List( + (t1, "update", "{\"time\": 10}"), + (t2, "delete", ""), + (t3, "update", "{\"time\": 10, \"weight\": 20}")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + tcNum = 8 + tcString = "[t1 -> t3 -> t2 test case] update(t1) update(t3) delete(t2) test " + bulkQueries = List( + (t1, "update", "{\"time\": 10}"), + (t3, "update", "{\"time\": 10, \"weight\": 20}"), + (t2, "delete", "")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + tcNum = 9 + tcString = "[t2 -> t1 -> t3 test case] delete(t2) update(t1) update(t3) test " + bulkQueries = List( + (t2, "delete", ""), + (t1, "update", "{\"time\": 10}"), + (t3, "update", "{\"time\": 10, \"weight\": 20}")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + tcNum = 10 + tcString = "[t2 -> t3 -> t1 test case] delete(t2) update(t3) update(t1) test" + bulkQueries = List( + (t2, "delete", ""), + (t3, "update", "{\"time\": 10, \"weight\": 20}"), + (t1, "update", "{\"time\": 10}")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + tcNum = 11 + tcString = "[t3 -> t2 -> t1 test case] update(t3) delete(t2) update(t1) test " + bulkQueries = List( + (t3, "update", "{\"time\": 10, \"weight\": 20}"), + (t2, "delete", ""), + (t1, "update", "{\"time\": 10}")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + tcNum = 12 + tcString = "[t3 -> t1 -> t2 test case] update(t3) update(t1) delete(t2) test " + bulkQueries = List( + (t3, "update", "{\"time\": 10, \"weight\": 20}"), + (t1, "update", "{\"time\": 10}"), + (t2, "delete", "")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + + tcNum = 13 + tcString = "[t5 -> t1 -> t3 -> t2 -> t4 test case] update(t5) insert(t1) insert(t3) delete(t2) update(t4) test " + bulkQueries = List( + (t5, "update", "{\"is_blocked\": true}"), + (t1, "insert", "{\"is_hidden\": false}"), + (t3, "insert", "{\"is_hidden\": false, \"weight\": 10}"), + (t2, "delete", ""), + (t4, "update", "{\"time\": 1, \"weight\": -10}")) + expected = Map("time" -> "1", "weight" -> "-10", "is_hidden" -> "false", "is_blocked" -> "true") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + } + + + object CrudHelper { + + class CrudTestRunner { + var seed = 0 + + def run(tcNum: Int, tcString: String, opWithProps: List[(Long, String, String)], expected: Map[String, String]) = { + for { + labelName <- List(testLabelName, testLabelName2) + i <- 0 until NumOfEachTest + } { + seed += 1 + val srcId = seed.toString + val tgtId = srcId + + val maxTs = opWithProps.map(t => t._1).max + + /** insert edges */ + println(s"---- TC${tcNum}_init ----") + val bulkEdges = (for ((ts, op, props) <- opWithProps) yield { + TestUtil.toEdge(ts, op, "e", srcId, tgtId, labelName, props) + }) + + TestUtil.insertEdgesSync(bulkEdges: _*) + + for { + label <- Label.findByName(labelName) + direction <- List("out", "in") + cacheTTL <- List(-1L) + } { + val (serviceName, columnName, id, otherId) = direction match { + case "out" => (label.srcService.serviceName, label.srcColumn.columnName, srcId, tgtId) + case "in" => (label.tgtService.serviceName, label.tgtColumn.columnName, tgtId, srcId) + } + + val qId = if (labelName == testLabelName) id else "\"" + id + "\"" + val query = queryJson(serviceName, columnName, labelName, qId, direction, cacheTTL) + + val jsResult = TestUtil.getEdgesSync(query) + + val results = jsResult \ "results" + val deegrees = (jsResult \ "degrees").as[List[JsObject]] + val propsLs = (results \\ "props").seq + (deegrees.head \ LabelMeta.degree.name).as[Int] should be(1) + + val from = (results \\ "from").seq.last.toString.replaceAll("\"", "") + val to = (results \\ "to").seq.last.toString.replaceAll("\"", "") + + from should be(id.toString) + to should be(otherId.toString) + (results \\ "_timestamp").seq.last.as[Long] should be(maxTs) + + for ((key, expectedVal) <- expected) { + propsLs.last.as[JsObject].keys.contains(key) should be(true) + (propsLs.last \ key).toString should be(expectedVal) + } + } + } + } + + def queryJson(serviceName: String, columnName: String, labelName: String, id: String, dir: String, cacheTTL: Long = -1L) = Json.parse( + s""" { "srcVertices": [ + { "serviceName": "$serviceName", + "columnName": "$columnName", + "id": $id } ], + "steps": [ [ { + "label": "$labelName", + "direction": "$dir", + "offset": 0, + "limit": 10, + "cacheTTL": $cacheTTL }]]}""") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala new file mode 100644 index 0000000..86bc85c --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala @@ -0,0 +1,309 @@ +package org.apache.s2graph.core.Integrate + +import com.typesafe.config._ +import org.apache.s2graph.core.mysqls.Label +import org.apache.s2graph.core.rest.{RequestParser, RestHandler} +import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core.{Graph, GraphUtil, Management, PostProcess} +import org.scalatest._ +import play.api.libs.json.{JsValue, Json} + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} + +trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { + + import TestUtil._ + + var graph: Graph = _ + var parser: RequestParser = _ + var management: Management = _ + var config: Config = _ + + override def beforeAll = { + config = ConfigFactory.load() + graph = new Graph(config)(ExecutionContext.Implicits.global) + management = new Management(graph) + parser = new RequestParser(graph.config) + initTestData() + } + + override def afterAll(): Unit = { + graph.shutdown() + } + + /** + * Make Service, Label, Vertex for integrate test + */ + def initTestData() = { + println("[init start]: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") + Management.deleteService(testServiceName) + + // 1. createService + val jsValue = Json.parse(createService) + val (serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) = + parser.toServiceElements(jsValue) + + val tryRes = + management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) + println(s">> Service created : $createService, $tryRes") + + val labelNames = Map(testLabelName -> testLabelNameCreate, + testLabelName2 -> testLabelName2Create, + testLabelNameV1 -> testLabelNameV1Create, + testLabelNameWeak -> testLabelNameWeakCreate) + + for { + (labelName, create) <- labelNames + } { + Management.deleteLabel(labelName) + Label.findByName(labelName, useCache = false) match { + case None => + val json = Json.parse(create) + val tryRes = for { + labelArgs <- parser.toLabelElements(json) + label <- (management.createLabel _).tupled(labelArgs) + } yield label + + tryRes.get + case Some(label) => + println(s">> Label already exist: $create, $label") + } + } + + val vertexPropsKeys = List("age" -> "int") + + vertexPropsKeys.map { case (key, keyType) => + Management.addVertexProp(testServiceName, testColumnName, key, keyType) + } + + println("[init end]: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") + } + + + /** + * Test Helpers + */ + object TestUtil { + implicit def ec = scala.concurrent.ExecutionContext.global + + // def checkEdgeQueryJson(params: Seq[(String, String, String, String)]) = { + // val arr = for { + // (label, dir, from, to) <- params + // } yield { + // Json.obj("label" -> label, "direction" -> dir, "from" -> from, "to" -> to) + // } + // + // val s = Json.toJson(arr) + // s + // } + + def deleteAllSync(jsValue: JsValue) = { + val future = Future.sequence(jsValue.as[Seq[JsValue]] map { json => + val (labels, direction, ids, ts, vertices) = parser.toDeleteParam(json) + val future = graph.deleteAllAdjacentEdges(vertices.toList, labels, GraphUtil.directions(direction), ts) + + future + }) + + Await.result(future, HttpRequestWaitingTime) + } + + def getEdgesSync(queryJson: JsValue): JsValue = { + logger.info(Json.prettyPrint(queryJson)) + val restHandler = new RestHandler(graph) + Await.result(restHandler.getEdgesAsync(queryJson)(PostProcess.toSimpleVertexArrJson), HttpRequestWaitingTime) + } + + def insertEdgesSync(bulkEdges: String*) = { + val req = graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait = true) + val jsResult = Await.result(req, HttpRequestWaitingTime) + + jsResult + } + + def insertEdgesAsync(bulkEdges: String*) = { + val req = graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait = true) + req + } + + def toEdge(elems: Any*): String = elems.mkString("\t") + + // common tables + val testServiceName = "s2graph" + val testLabelName = "s2graph_label_test" + val testLabelName2 = "s2graph_label_test_2" + val testLabelNameV1 = "s2graph_label_test_v1" + val testLabelNameWeak = "s2graph_label_test_weak" + val testColumnName = "user_id_test" + val testColumnType = "long" + val testTgtColumnName = "item_id_test" + val testHTableName = "test-htable" + val newHTableName = "new-htable" + val index1 = "idx_1" + val index2 = "idx_2" + + val NumOfEachTest = 30 + val HttpRequestWaitingTime = Duration("60 seconds") + + val createService = s"""{"serviceName" : "$testServiceName"}""" + + val testLabelNameCreate = + s""" + { + "label": "$testLabelName", + "srcServiceName": "$testServiceName", + "srcColumnName": "$testColumnName", + "srcColumnType": "long", + "tgtServiceName": "$testServiceName", + "tgtColumnName": "$testColumnName", + "tgtColumnType": "long", + "indices": [ + {"name": "$index1", "propNames": ["weight", "time", "is_hidden", "is_blocked"]}, + {"name": "$index2", "propNames": ["_timestamp"]} + ], + "props": [ + { + "name": "time", + "dataType": "long", + "defaultValue": 0 + }, + { + "name": "weight", + "dataType": "long", + "defaultValue": 0 + }, + { + "name": "is_hidden", + "dataType": "boolean", + "defaultValue": false + }, + { + "name": "is_blocked", + "dataType": "boolean", + "defaultValue": false + } + ], + "consistencyLevel": "strong", + "schemaVersion": "v4", + "compressionAlgorithm": "gz", + "hTableName": "$testHTableName" + }""" + + val testLabelName2Create = + s""" + { + "label": "$testLabelName2", + "srcServiceName": "$testServiceName", + "srcColumnName": "$testColumnName", + "srcColumnType": "long", + "tgtServiceName": "$testServiceName", + "tgtColumnName": "$testTgtColumnName", + "tgtColumnType": "string", + "indices": [{"name": "$index1", "propNames": ["time", "weight", "is_hidden", "is_blocked"]}], + "props": [ + { + "name": "time", + "dataType": "long", + "defaultValue": 0 + }, + { + "name": "weight", + "dataType": "long", + "defaultValue": 0 + }, + { + "name": "is_hidden", + "dataType": "boolean", + "defaultValue": false + }, + { + "name": "is_blocked", + "dataType": "boolean", + "defaultValue": false + } + ], + "consistencyLevel": "strong", + "isDirected": false, + "schemaVersion": "v3", + "compressionAlgorithm": "gz" + }""" + + val testLabelNameV1Create = + s""" + { + "label": "$testLabelNameV1", + "srcServiceName": "$testServiceName", + "srcColumnName": "$testColumnName", + "srcColumnType": "long", + "tgtServiceName": "$testServiceName", + "tgtColumnName": "${testTgtColumnName}_v1", + "tgtColumnType": "string", + "indices": [{"name": "$index1", "propNames": ["time", "weight", "is_hidden", "is_blocked"]}], + "props": [ + { + "name": "time", + "dataType": "long", + "defaultValue": 0 + }, + { + "name": "weight", + "dataType": "long", + "defaultValue": 0 + }, + { + "name": "is_hidden", + "dataType": "boolean", + "defaultValue": false + }, + { + "name": "is_blocked", + "dataType": "boolean", + "defaultValue": false + } + ], + "consistencyLevel": "strong", + "isDirected": true, + "schemaVersion": "v1", + "compressionAlgorithm": "gz" + }""" + + val testLabelNameWeakCreate = + s""" + { + "label": "$testLabelNameWeak", + "srcServiceName": "$testServiceName", + "srcColumnName": "$testColumnName", + "srcColumnType": "long", + "tgtServiceName": "$testServiceName", + "tgtColumnName": "$testTgtColumnName", + "tgtColumnType": "string", + "indices": [{"name": "$index1", "propNames": ["time", "weight", "is_hidden", "is_blocked"]}], + "props": [ + { + "name": "time", + "dataType": "long", + "defaultValue": 0 + }, + { + "name": "weight", + "dataType": "long", + "defaultValue": 0 + }, + { + "name": "is_hidden", + "dataType": "boolean", + "defaultValue": false + }, + { + "name": "is_blocked", + "dataType": "boolean", + "defaultValue": false + } + ], + "consistencyLevel": "weak", + "isDirected": true, + "compressionAlgorithm": "gz" + }""" + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala new file mode 100644 index 0000000..232e685 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala @@ -0,0 +1,906 @@ +package org.apache.s2graph.core.Integrate + +import org.apache.s2graph.core.utils.logger +import org.scalatest.BeforeAndAfterEach +import play.api.libs.json.{JsNumber, JsValue, Json} + +class QueryTest extends IntegrateCommon with BeforeAndAfterEach { + + import TestUtil._ + + val insert = "insert" + val e = "e" + val weight = "weight" + val is_hidden = "is_hidden" + + test("interval") { + def queryWithInterval(id: Int, index: String, prop: String, fromVal: Int, toVal: Int) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "$testServiceName", + "columnName": "$testColumnName", + "id": $id + }], + "steps": [ + [ { + "label": "$testLabelName", + "index": "$index", + "interval": { + "from": [ { "$prop": $fromVal } ], + "to": [ { "$prop": $toVal } ] + } + } + ]] + } + """) + + var edges = getEdgesSync(queryWithInterval(0, index2, "_timestamp", 1000, 1001)) // test interval on timestamp index + (edges \ "size").toString should be("1") + + edges = getEdgesSync(queryWithInterval(0, index2, "_timestamp", 1000, 2000)) // test interval on timestamp index + (edges \ "size").toString should be("2") + + edges = getEdgesSync(queryWithInterval(2, index1, "weight", 10, 11)) // test interval on weight index + (edges \ "size").toString should be("1") + + edges = getEdgesSync(queryWithInterval(2, index1, "weight", 10, 20)) // test interval on weight index + (edges \ "size").toString should be("2") + } + + test("get edge with where condition") { + def queryWhere(id: Int, where: String) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "${testServiceName}", + "columnName": "${testColumnName}", + "id": ${id} + }], + "steps": [ + [ { + "label": "${testLabelName}", + "direction": "out", + "offset": 0, + "limit": 100, + "where": "${where}" + } + ]] + }""") + + var result = getEdgesSync(queryWhere(0, "is_hidden=false and _from in (-1, 0)")) + (result \ "results").as[List[JsValue]].size should be(1) + + result = getEdgesSync(queryWhere(0, "is_hidden=true and _to in (1)")) + (result \ "results").as[List[JsValue]].size should be(1) + + result = getEdgesSync(queryWhere(0, "_from=0")) + (result \ "results").as[List[JsValue]].size should be(2) + + result = getEdgesSync(queryWhere(2, "_from=2 or weight in (-1)")) + (result \ "results").as[List[JsValue]].size should be(2) + + result = getEdgesSync(queryWhere(2, "_from=2 and weight in (10, 20)")) + (result \ "results").as[List[JsValue]].size should be(2) + } + + test("get edge exclude") { + def queryExclude(id: Int) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "${testServiceName}", + "columnName": "${testColumnName}", + "id": ${id} + }], + "steps": [ + [ { + "label": "${testLabelName}", + "direction": "out", + "offset": 0, + "limit": 2 + }, + { + "label": "${testLabelName}", + "direction": "in", + "offset": 0, + "limit": 2, + "exclude": true + } + ]] + }""") + + val result = getEdgesSync(queryExclude(0)) + (result \ "results").as[List[JsValue]].size should be(1) + } + + test("get edge groupBy property") { + def queryGroupBy(id: Int, props: Seq[String]): JsValue = { + Json.obj( + "groupBy" -> props, + "srcVertices" -> Json.arr( + Json.obj("serviceName" -> testServiceName, "columnName" -> testColumnName, "id" -> id) + ), + "steps" -> Json.arr( + Json.obj( + "step" -> Json.arr( + Json.obj( + "label" -> testLabelName + ) + ) + ) + ) + ) + } + + val result = getEdgesSync(queryGroupBy(0, Seq("weight"))) + (result \ "size").as[Int] should be(2) + val weights = (result \ "results" \\ "groupBy").map { js => + (js \ "weight").as[Int] + } + weights should contain(30) + weights should contain(40) + + weights should not contain (10) + } + + test("edge transform") { + def queryTransform(id: Int, transforms: String) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "${testServiceName}", + "columnName": "${testColumnName}", + "id": ${id} + }], + "steps": [ + [ { + "label": "${testLabelName}", + "direction": "out", + "offset": 0, + "transform": $transforms + } + ]] + }""") + + var result = getEdgesSync(queryTransform(0, "[[\"_to\"]]")) + (result \ "results").as[List[JsValue]].size should be(2) + + result = getEdgesSync(queryTransform(0, "[[\"weight\"]]")) + (result \ "results" \\ "to").map(_.toString).sorted should be((result \ "results" \\ "weight").map(_.toString).sorted) + + result = getEdgesSync(queryTransform(0, "[[\"_from\"]]")) + (result \ "results" \\ "to").map(_.toString).sorted should be((result \ "results" \\ "from").map(_.toString).sorted) + } + + test("index") { + def queryIndex(ids: Seq[Int], indexName: String) = { + val $from = Json.arr( + Json.obj("serviceName" -> testServiceName, + "columnName" -> testColumnName, + "ids" -> ids)) + + val $step = Json.arr(Json.obj("label" -> testLabelName, "index" -> indexName)) + val $steps = Json.arr(Json.obj("step" -> $step)) + + val js = Json.obj("withScore" -> false, "srcVertices" -> $from, "steps" -> $steps) + js + } + + // weight order + var result = getEdgesSync(queryIndex(Seq(0), "idx_1")) + ((result \ "results").as[List[JsValue]].head \\ "weight").head should be(JsNumber(40)) + + // timestamp order + result = getEdgesSync(queryIndex(Seq(0), "idx_2")) + ((result \ "results").as[List[JsValue]].head \\ "weight").head should be(JsNumber(30)) + } + + // "checkEdges" in { + // running(FakeApplication()) { + // val json = Json.parse( s""" + // [{"from": 0, "to": 1, "label": "$testLabelName"}, + // {"from": 0, "to": 2, "label": "$testLabelName"}] + // """) + // + // def checkEdges(queryJson: JsValue): JsValue = { + // val ret = route(FakeRequest(POST, "/graphs/checkEdges").withJsonBody(queryJson)).get + // contentAsJson(ret) + // } + // + // val res = checkEdges(json) + // val typeRes = res.isInstanceOf[JsArray] + // typeRes must equalTo(true) + // + // val fst = res.as[Seq[JsValue]].head \ "to" + // fst.as[Int] must equalTo(1) + // + // val snd = res.as[Seq[JsValue]].last \ "to" + // snd.as[Int] must equalTo(2) + // } + // } + + + + test("duration") { + def queryDuration(ids: Seq[Int], from: Int, to: Int) = { + val $from = Json.arr( + Json.obj("serviceName" -> testServiceName, + "columnName" -> testColumnName, + "ids" -> ids)) + + val $step = Json.arr(Json.obj( + "label" -> testLabelName, "direction" -> "out", "offset" -> 0, "limit" -> 100, + "duration" -> Json.obj("from" -> from, "to" -> to))) + + val $steps = Json.arr(Json.obj("step" -> $step)) + + Json.obj("srcVertices" -> $from, "steps" -> $steps) + } + + // get all + var result = getEdgesSync(queryDuration(Seq(0, 2), from = 0, to = 5000)) + (result \ "results").as[List[JsValue]].size should be(4) + // inclusive, exclusive + result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 4000)) + (result \ "results").as[List[JsValue]].size should be(3) + + result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 2000)) + (result \ "results").as[List[JsValue]].size should be(1) + + val bulkEdges = Seq( + toEdge(1001, insert, e, 0, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)), + toEdge(2002, insert, e, 0, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)), + toEdge(3003, insert, e, 2, 0, testLabelName, Json.obj(weight -> 30)), + toEdge(4004, insert, e, 2, 1, testLabelName, Json.obj(weight -> 40)) + ) + insertEdgesSync(bulkEdges: _*) + + // duration test after udpate + // get all + result = getEdgesSync(queryDuration(Seq(0, 2), from = 0, to = 5000)) + (result \ "results").as[List[JsValue]].size should be(4) + + // inclusive, exclusive + result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 4000)) + (result \ "results").as[List[JsValue]].size should be(3) + + result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 2000)) + (result \ "results").as[List[JsValue]].size should be(1) + + } + + + test("return tree") { + def queryParents(id: Long) = Json.parse( + s""" + { + "returnTree": true, + "srcVertices": [ + { "serviceName": "$testServiceName", + "columnName": "$testColumnName", + "id": $id + }], + "steps": [ + [ { + "label": "$testLabelName", + "direction": "out", + "offset": 0, + "limit": 2 + } + ],[{ + "label": "$testLabelName", + "direction": "in", + "offset": 0, + "limit": -1 + } + ]] + }""".stripMargin) + + val src = 100 + val tgt = 200 + + insertEdgesSync(toEdge(1001, "insert", "e", src, tgt, testLabelName)) + + val result = TestUtil.getEdgesSync(queryParents(src)) + val parents = (result \ "results").as[Seq[JsValue]] + val ret = parents.forall { + edge => (edge \ "parents").as[Seq[JsValue]].size == 1 + } + + ret should be(true) + } + + + +// test("pagination and _to") { +// def querySingleWithTo(id: Int, offset: Int = 0, limit: Int = 100, to: Int) = Json.parse( +// s""" +// { "srcVertices": [ +// { "serviceName": "${testServiceName}", +// "columnName": "${testColumnName}", +// "id": ${id} +// }], +// "steps": [ +// [ { +// "label": "${testLabelName}", +// "direction": "out", +// "offset": $offset, +// "limit": $limit, +// "_to": $to +// } +// ]] +// } +// """) +// +// val src = System.currentTimeMillis().toInt +// +// val bulkEdges = Seq( +// toEdge(1001, insert, e, src, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)), +// toEdge(2002, insert, e, src, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)), +// toEdge(3003, insert, e, src, 3, testLabelName, Json.obj(weight -> 30)), +// toEdge(4004, insert, e, src, 4, testLabelName, Json.obj(weight -> 40)) +// ) +// insertEdgesSync(bulkEdges: _*) +// +// var result = getEdgesSync(querySingle(src, offset = 0, limit = 2)) +// var edges = (result \ "results").as[List[JsValue]] +// +// edges.size should be(2) +// (edges(0) \ "to").as[Long] should be(4) +// (edges(1) \ "to").as[Long] should be(3) +// +// result = getEdgesSync(querySingle(src, offset = 1, limit = 2)) +// +// edges = (result \ "results").as[List[JsValue]] +// edges.size should be(2) +// (edges(0) \ "to").as[Long] should be(3) +// (edges(1) \ "to").as[Long] should be(2) +// +// result = getEdgesSync(querySingleWithTo(src, offset = 0, limit = -1, to = 1)) +// edges = (result \ "results").as[List[JsValue]] +// edges.size should be(1) +// } + + test("order by") { + def queryScore(id: Int, scoring: Map[String, Int]): JsValue = Json.obj( + "srcVertices" -> Json.arr( + Json.obj( + "serviceName" -> testServiceName, + "columnName" -> testColumnName, + "id" -> id + ) + ), + "steps" -> Json.arr( + Json.obj( + "step" -> Json.arr( + Json.obj( + "label" -> testLabelName, + "scoring" -> scoring + ) + ) + ) + ) + ) + def queryOrderBy(id: Int, scoring: Map[String, Int], props: Seq[Map[String, String]]): JsValue = Json.obj( + "orderBy" -> props, + "srcVertices" -> Json.arr( + Json.obj("serviceName" -> testServiceName, "columnName" -> testColumnName, "id" -> id) + ), + "steps" -> Json.arr( + Json.obj( + "step" -> Json.arr( + Json.obj( + "label" -> testLabelName, + "scoring" -> scoring + ) + ) + ) + ) + ) + + val bulkEdges = Seq( + toEdge(1001, insert, e, 0, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)), + toEdge(2002, insert, e, 0, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)), + toEdge(3003, insert, e, 2, 0, testLabelName, Json.obj(weight -> 30)), + toEdge(4004, insert, e, 2, 1, testLabelName, Json.obj(weight -> 40)) + ) + + insertEdgesSync(bulkEdges: _*) + + // get edges + val edges = getEdgesSync(queryScore(0, Map("weight" -> 1))) + val orderByScore = getEdgesSync(queryOrderBy(0, Map("weight" -> 1), Seq(Map("score" -> "DESC", "timestamp" -> "DESC")))) + val ascOrderByScore = getEdgesSync(queryOrderBy(0, Map("weight" -> 1), Seq(Map("score" -> "ASC", "timestamp" -> "DESC")))) + + val edgesTo = edges \ "results" \\ "to" + val orderByTo = orderByScore \ "results" \\ "to" + val ascOrderByTo = ascOrderByScore \ "results" \\ "to" + + edgesTo should be(Seq(JsNumber(2), JsNumber(1))) + edgesTo should be(orderByTo) + ascOrderByTo should be(Seq(JsNumber(1), JsNumber(2))) + edgesTo.reverse should be(ascOrderByTo) + } + + + test("query with sampling") { + def queryWithSampling(id: Int, sample: Int) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "$testServiceName", + "columnName": "$testColumnName", + "id": $id + }], + "steps": [ + { + "step": [{ + "label": "$testLabelName", + "direction": "out", + "offset": 0, + "limit": 100, + "sample": $sample + }] + } + ] + }""") + + def twoStepQueryWithSampling(id: Int, sample: Int) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "$testServiceName", + "columnName": "$testColumnName", + "id": $id + }], + "steps": [ + { + "step": [{ + "label": "$testLabelName", + "direction": "out", + "offset": 0, + "limit": 100, + "sample": $sample + }] + }, + { + "step": [{ + "label": "$testLabelName", + "direction": "out", + "offset": 0, + "limit": 100, + "sample": $sample + }] + } + ] + }""") + + def twoQueryWithSampling(id: Int, sample: Int) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "$testServiceName", + "columnName": "$testColumnName", + "id": $id + }], + "steps": [ + { + "step": [{ + "label": "$testLabelName", + "direction": "out", + "offset": 0, + "limit": 50, + "sample": $sample + }, + { + "label": "$testLabelName2", + "direction": "out", + "offset": 0, + "limit": 50 + }] + } + ] + }""") + + val sampleSize = 2 + val ts = "1442985659166" + val testId = 22 + + val bulkEdges = Seq( + toEdge(ts, insert, e, testId, 122, testLabelName), + toEdge(ts, insert, e, testId, 222, testLabelName), + toEdge(ts, insert, e, testId, 322, testLabelName), + + toEdge(ts, insert, e, testId, 922, testLabelName2), + toEdge(ts, insert, e, testId, 222, testLabelName2), + toEdge(ts, insert, e, testId, 322, testLabelName2), + + toEdge(ts, insert, e, 122, 1122, testLabelName), + toEdge(ts, insert, e, 122, 1222, testLabelName), + toEdge(ts, insert, e, 122, 1322, testLabelName), + toEdge(ts, insert, e, 222, 2122, testLabelName), + toEdge(ts, insert, e, 222, 2222, testLabelName), + toEdge(ts, insert, e, 222, 2322, testLabelName), + toEdge(ts, insert, e, 322, 3122, testLabelName), + toEdge(ts, insert, e, 322, 3222, testLabelName), + toEdge(ts, insert, e, 322, 3322, testLabelName) + ) + + insertEdgesSync(bulkEdges: _*) + + val result1 = getEdgesSync(queryWithSampling(testId, sampleSize)) + (result1 \ "results").as[List[JsValue]].size should be(math.min(sampleSize, bulkEdges.size)) + + val result2 = getEdgesSync(twoStepQueryWithSampling(testId, sampleSize)) + (result2 \ "results").as[List[JsValue]].size should be(math.min(sampleSize * sampleSize, bulkEdges.size * bulkEdges.size)) + + val result3 = getEdgesSync(twoQueryWithSampling(testId, sampleSize)) + (result3 \ "results").as[List[JsValue]].size should be(sampleSize + 3) // edges in testLabelName2 = 3 + } + test("test query with filterOut query") { + def queryWithFilterOut(id1: String, id2: String) = Json.parse( + s"""{ + | "limit": 10, + | "filterOut": { + | "srcVertices": [{ + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "id": $id1 + | }], + | "steps": [{ + | "step": [{ + | "label": "$testLabelName", + | "direction": "out", + | "offset": 0, + | "limit": 10 + | }] + | }] + | }, + | "srcVertices": [{ + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "id": $id2 + | }], + | "steps": [{ + | "step": [{ + | "label": "$testLabelName", + | "direction": "out", + | "offset": 0, + | "limit": 5 + | }] + | }] + |} + """.stripMargin + ) + + val testId1 = "-23" + val testId2 = "-25" + + val bulkEdges = Seq( + toEdge(1, insert, e, testId1, 111, testLabelName, Json.obj(weight -> 10)), + toEdge(2, insert, e, testId1, 222, testLabelName, Json.obj(weight -> 10)), + toEdge(3, insert, e, testId1, 333, testLabelName, Json.obj(weight -> 10)), + toEdge(4, insert, e, testId2, 111, testLabelName, Json.obj(weight -> 1)), + toEdge(5, insert, e, testId2, 333, testLabelName, Json.obj(weight -> 1)), + toEdge(6, insert, e, testId2, 555, testLabelName, Json.obj(weight -> 1)) + ) + logger.debug(s"${bulkEdges.mkString("\n")}") + insertEdgesSync(bulkEdges: _*) + + val rs = getEdgesSync(queryWithFilterOut(testId1, testId2)) + logger.debug(Json.prettyPrint(rs)) + val results = (rs \ "results").as[List[JsValue]] + results.size should be(1) + (results(0) \ "to").toString should be("555") + } + + + /** note that this merge two different label result into one */ + test("weighted union") { + def queryWithWeightedUnion(id1: String, id2: String) = Json.parse( + s""" + |{ + | "limit": 10, + | "weights": [ + | 10, + | 1 + | ], + | "groupBy": ["weight"], + | "queries": [ + | { + | "srcVertices": [ + | { + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "id": $id1 + | } + | ], + | "steps": [ + | { + | "step": [ + | { + | "label": "$testLabelName", + | "direction": "out", + | "offset": 0, + | "limit": 5 + | } + | ] + | } + | ] + | }, + | { + | "srcVertices": [ + | { + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "id": $id2 + | } + | ], + | "steps": [ + | { + | "step": [ + | { + | "label": "$testLabelName2", + | "direction": "out", + | "offset": 0, + | "limit": 5 + | } + | ] + | } + | ] + | } + | ] + |} + """.stripMargin + ) + + val testId1 = "1" + val testId2 = "2" + + val bulkEdges = Seq( + toEdge(1, insert, e, testId1, 111, testLabelName, Json.obj(weight -> 10)), + toEdge(2, insert, e, testId1, 222, testLabelName, Json.obj(weight -> 10)), + toEdge(3, insert, e, testId1, 333, testLabelName, Json.obj(weight -> 10)), + toEdge(4, insert, e, testId2, 444, testLabelName2, Json.obj(weight -> 1)), + toEdge(5, insert, e, testId2, 555, testLabelName2, Json.obj(weight -> 1)), + toEdge(6, insert, e, testId2, 666, testLabelName2, Json.obj(weight -> 1)) + ) + + insertEdgesSync(bulkEdges: _*) + + val rs = getEdgesSync(queryWithWeightedUnion(testId1, testId2)) + logger.debug(Json.prettyPrint(rs)) + val results = (rs \ "results").as[List[JsValue]] + results.size should be(2) + (results(0) \ "scoreSum").as[Float] should be(30.0) + (results(0) \ "agg").as[List[JsValue]].size should be(3) + (results(1) \ "scoreSum").as[Float] should be(3.0) + (results(1) \ "agg").as[List[JsValue]].size should be(3) + } + + test("weighted union with options") { + def queryWithWeightedUnionWithOptions(id1: String, id2: String) = Json.parse( + s""" + |{ + | "limit": 10, + | "weights": [ + | 10, + | 1 + | ], + | "groupBy": ["to"], + | "select": ["to", "weight"], + | "filterOut": { + | "srcVertices": [ + | { + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "id": $id1 + | } + | ], + | "steps": [ + | { + | "step": [ + | { + | "label": "$testLabelName", + | "direction": "out", + | "offset": 0, + | "limit": 10 + | } + | ] + | } + | ] + | }, + | "queries": [ + | { + | "srcVertices": [ + | { + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "id": $id1 + | } + | ], + | "steps": [ + | { + | "step": [ + | { + | "label": "$testLabelName", + | "direction": "out", + | "offset": 0, + | "limit": 5 + | } + | ] + | } + | ] + | }, + | { + | "srcVertices": [ + | { + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "id": $id2 + | } + | ], + | "steps": [ + | { + | "step": [ + | { + | "label": "$testLabelName2", + | "direction": "out", + | "offset": 0, + | "limit": 5 + | } + | ] + | } + | ] + | } + | ] + |} + """.stripMargin + ) + + val testId1 = "-192848" + val testId2 = "-193849" + + val bulkEdges = Seq( + toEdge(1, insert, e, testId1, 111, testLabelName, Json.obj(weight -> 10)), + toEdge(2, insert, e, testId1, 222, testLabelName, Json.obj(weight -> 10)), + toEdge(3, insert, e, testId1, 333, testLabelName, Json.obj(weight -> 10)), + toEdge(4, insert, e, testId2, 111, testLabelName2, Json.obj(weight -> 1)), + toEdge(5, insert, e, testId2, 333, testLabelName2, Json.obj(weight -> 1)), + toEdge(6, insert, e, testId2, 555, testLabelName2, Json.obj(weight -> 1)) + ) + + insertEdgesSync(bulkEdges: _*) + + val rs = getEdgesSync(queryWithWeightedUnionWithOptions(testId1, testId2)) + logger.debug(Json.prettyPrint(rs)) + val results = (rs \ "results").as[List[JsValue]] + results.size should be(1) + + } + + test("scoreThreshold") { + def queryWithScoreThreshold(id: String, scoreThreshold: Int) = Json.parse( + s"""{ + | "limit": 10, + | "scoreThreshold": $scoreThreshold, + | "groupBy": ["to"], + | "srcVertices": [ + | { + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "id": $id + | } + | ], + | "steps": [ + | { + | "step": [ + | { + | "label": "$testLabelName", + | "direction": "out", + | "offset": 0, + | "limit": 10 + | } + | ] + | }, + | { + | "step": [ + | { + | "label": "$testLabelName", + | "direction": "out", + | "offset": 0, + | "limit": 10 + | } + | ] + | } + | ] + |} + """.stripMargin + ) + + val testId = "-23903" + + val bulkEdges = Seq( + toEdge(1, insert, e, testId, 101, testLabelName, Json.obj(weight -> 10)), + toEdge(1, insert, e, testId, 102, testLabelName, Json.obj(weight -> 10)), + toEdge(1, insert, e, testId, 103, testLabelName, Json.obj(weight -> 10)), + toEdge(1, insert, e, 101, 102, testLabelName, Json.obj(weight -> 10)), + toEdge(1, insert, e, 101, 103, testLabelName, Json.obj(weight -> 10)), + toEdge(1, insert, e, 101, 104, testLabelName, Json.obj(weight -> 10)), + toEdge(1, insert, e, 102, 103, testLabelName, Json.obj(weight -> 10)), + toEdge(1, insert, e, 102, 104, testLabelName, Json.obj(weight -> 10)), + toEdge(1, insert, e, 103, 105, testLabelName, Json.obj(weight -> 10)) + ) + // expected: 104 -> 2, 103 -> 2, 102 -> 1,, 105 -> 1 + insertEdgesSync(bulkEdges: _*) + + var rs = getEdgesSync(queryWithScoreThreshold(testId, 2)) + logger.debug(Json.prettyPrint(rs)) + var results = (rs \ "results").as[List[JsValue]] + results.size should be(2) + + rs = getEdgesSync(queryWithScoreThreshold(testId, 1)) + logger.debug(Json.prettyPrint(rs)) + + results = (rs \ "results").as[List[JsValue]] + results.size should be(4) + } + + def querySingle(id: Int, offset: Int = 0, limit: Int = 100) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "$testServiceName", + "columnName": "$testColumnName", + "id": $id + }], + "steps": [ + [ { + "label": "$testLabelName", + "direction": "out", + "offset": $offset, + "limit": $limit + } + ]] + } + """) + + def queryGlobalLimit(id: Int, limit: Int): JsValue = Json.obj( + "limit" -> limit, + "srcVertices" -> Json.arr( + Json.obj("serviceName" -> testServiceName, "columnName" -> testColumnName, "id" -> id) + ), + "steps" -> Json.arr( + Json.obj( + "step" -> Json.arr( + Json.obj( + "label" -> testLabelName + ) + ) + ) + ) + ) + + + // called by each test, each + override def beforeEach = initTestData() + + // called by start test, once + override def initTestData(): Unit = { + super.initTestData() + + insertEdgesSync( + toEdge(1000, insert, e, 0, 1, testLabelName, Json.obj(weight -> 40, is_hidden -> true)), + toEdge(2000, insert, e, 0, 2, testLabelName, Json.obj(weight -> 30, is_hidden -> false)), + toEdge(3000, insert, e, 2, 0, testLabelName, Json.obj(weight -> 20)), + toEdge(4000, insert, e, 2, 1, testLabelName, Json.obj(weight -> 10)), + toEdge(3000, insert, e, 10, 20, testLabelName, Json.obj(weight -> 20)), + toEdge(4000, insert, e, 20, 20, testLabelName, Json.obj(weight -> 10)), + toEdge(1, insert, e, -1, 1000, testLabelName), + toEdge(1, insert, e, -1, 2000, testLabelName), + toEdge(1, insert, e, -1, 3000, testLabelName), + toEdge(1, insert, e, 1000, 10000, testLabelName), + toEdge(1, insert, e, 1000, 11000, testLabelName), + toEdge(1, insert, e, 2000, 11000, testLabelName), + toEdge(1, insert, e, 2000, 12000, testLabelName), + toEdge(1, insert, e, 3000, 12000, testLabelName), + toEdge(1, insert, e, 3000, 13000, testLabelName), + toEdge(1, insert, e, 10000, 100000, testLabelName), + toEdge(2, insert, e, 11000, 200000, testLabelName), + toEdge(3, insert, e, 12000, 300000, testLabelName) + ) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala new file mode 100644 index 0000000..1c179bf --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala @@ -0,0 +1,283 @@ +package org.apache.s2graph.core.Integrate + +import java.util.concurrent.TimeUnit + +import play.api.libs.json.{JsValue, Json} + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} +import scala.util.Random + +class StrongLabelDeleteTest extends IntegrateCommon { + + import StrongDeleteUtil._ + import TestUtil._ + + test("Strong consistency select") { + insertEdgesSync(bulkEdges(): _*) + + var result = getEdgesSync(query(0)) + (result \ "results").as[List[JsValue]].size should be(2) + result = getEdgesSync(query(10)) + (result \ "results").as[List[JsValue]].size should be(2) + } + + test("Strong consistency deleteAll") { + val deletedAt = 100 + var result = getEdgesSync(query(20, direction = "in", columnName = testTgtColumnName)) + + println(result) + (result \ "results").as[List[JsValue]].size should be(3) + + val deleteParam = Json.arr( + Json.obj("label" -> testLabelName2, + "direction" -> "in", + "ids" -> Json.arr("20"), + "timestamp" -> deletedAt)) + + deleteAllSync(deleteParam) + + result = getEdgesSync(query(11, direction = "out")) + println(result) + (result \ "results").as[List[JsValue]].size should be(0) + + result = getEdgesSync(query(12, direction = "out")) + println(result) + (result \ "results").as[List[JsValue]].size should be(0) + + result = getEdgesSync(query(10, direction = "out")) + println(result) + // 10 -> out -> 20 should not be in result. + (result \ "results").as[List[JsValue]].size should be(1) + (result \\ "to").size should be(1) + (result \\ "to").head.as[String] should be("21") + + result = getEdgesSync(query(20, direction = "in", columnName = testTgtColumnName)) + println(result) + (result \ "results").as[List[JsValue]].size should be(0) + + insertEdgesSync(bulkEdges(startTs = deletedAt + 1): _*) + + result = getEdgesSync(query(20, direction = "in", columnName = testTgtColumnName)) + println(result) + + (result \ "results").as[List[JsValue]].size should be(3) + } + + + test("update delete") { + val ret = for { + i <- 0 until testNum + } yield { + val src = (i + 1) * 10000 +// val src = System.currentTimeMillis() + + val (ret, last) = testInner(i, src) + ret should be(true) + ret + } + + ret.forall(identity) + } + + test("update delete 2") { + val src = System.currentTimeMillis() + var ts = 0L + + val ret = for { + i <- 0 until testNum + } yield { + val (ret, lastTs) = testInner(ts, src) + val deletedAt = lastTs + 1 + val deletedAt2 = lastTs + 2 + ts = deletedAt2 + 1 // nex start ts + + ret should be(true) + + val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt)) + val deleteAllRequest2 = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt2)) + + val deleteRet = deleteAllSync(deleteAllRequest) + val deleteRet2 = deleteAllSync(deleteAllRequest2) + + val result = getEdgesSync(query(id = src)) + println(result) + + val resultEdges = (result \ "results").as[Seq[JsValue]] + resultEdges.isEmpty should be(true) + + val degreeAfterDeleteAll = getDegree(result) + + degreeAfterDeleteAll should be(0) + degreeAfterDeleteAll === (0) + } + + ret.forall(identity) + } + + /** This test stress out test on degree + * when contention is low but number of adjacent edges are large + * Large set of contention test + */ + test("large degrees") { + val labelName = testLabelName2 + val dir = "out" + val maxSize = 100 + val deleteSize = 10 + val numOfConcurrentBatch = 100 + val src = System.currentTimeMillis() + val tgts = (0 until maxSize).map { ith => src + ith } + val deleteTgts = Random.shuffle(tgts).take(deleteSize) + val insertRequests = tgts.map { tgt => + Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t") + } + val deleteRequests = deleteTgts.take(deleteSize).map { tgt => + Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", dir).mkString("\t") + } + val allRequests = Random.shuffle(insertRequests ++ deleteRequests) + // val allRequests = insertRequests ++ deleteRequests + val futures = allRequests.grouped(numOfConcurrentBatch).map { bulkRequests => + insertEdgesAsync(bulkRequests: _*) + } + + Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) + + val expectedDegree = insertRequests.size - deleteRequests.size + val queryJson = query(id = src) + val result = getEdgesSync(queryJson) + val resultSize = (result \ "size").as[Long] + val resultDegree = getDegree(result) + + // println(result) + + val ret = resultSize == expectedDegree && resultDegree == resultSize + println(s"[MaxSize]: $maxSize") + println(s"[DeleteSize]: $deleteSize") + println(s"[ResultDegree]: $resultDegree") + println(s"[ExpectedDegree]: $expectedDegree") + println(s"[ResultSize]: $resultSize") + ret should be(true) + } + + test("deleteAll") { + val labelName = testLabelName2 + val dir = "out" + val maxSize = 100 + val deleteSize = 10 + val numOfConcurrentBatch = 100 + val src = System.currentTimeMillis() + val tgts = (0 until maxSize).map { ith => src + ith } + val deleteTgts = Random.shuffle(tgts).take(deleteSize) + val insertRequests = tgts.map { tgt => + Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t") + } + val deleteRequests = deleteTgts.take(deleteSize).map { tgt => + Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", dir).mkString("\t") + } + val allRequests = Random.shuffle(insertRequests ++ deleteRequests) + val futures = allRequests.grouped(numOfConcurrentBatch).map { bulkRequests => + insertEdgesAsync(bulkRequests: _*) + } + + Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) + + val deletedAt = System.currentTimeMillis() + val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt)) + + deleteAllSync(deleteAllRequest) + + val result = getEdgesSync(query(id = src)) + println(result) + val resultEdges = (result \ "results").as[Seq[JsValue]] + resultEdges.isEmpty should be(true) + + val degreeAfterDeleteAll = getDegree(result) + degreeAfterDeleteAll should be(0) + } + + object StrongDeleteUtil { + + val labelName = testLabelName2 +// val labelName = testLabelName + val maxTgtId = 10 + val batchSize = 10 + val testNum = 100 + val numOfBatch = 10 + + def testInner(startTs: Long, src: Long) = { + val lastOps = Array.fill(maxTgtId)("none") + var currentTs = startTs + + val allRequests = for { + ith <- 0 until numOfBatch + jth <- 0 until batchSize + } yield { + currentTs += 1 + + val tgt = Random.nextInt(maxTgtId) + val op = if (Random.nextDouble() < 0.5) "delete" else "update" + + lastOps(tgt) = op + Seq(currentTs, op, "e", src, tgt, labelName, "{}").mkString("\t") + } + + allRequests.foreach(println(_)) + + val futures = Random.shuffle(allRequests).grouped(batchSize).map { bulkRequests => + insertEdgesAsync(bulkRequests: _*) + } + + Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) + + val expectedDegree = lastOps.count(op => op != "delete" && op != "none") + val queryJson = query(id = src) + val result = getEdgesSync(queryJson) + val resultSize = (result \ "size").as[Long] + val resultDegree = getDegree(result) + + println(lastOps.toList) + println(result) + + val ret = resultDegree == expectedDegree && resultSize == resultDegree + if (!ret) System.err.println(s"[Contention Failed]: $resultDegree, $expectedDegree") + + (ret, currentTs) + } + + def bulkEdges(startTs: Int = 0) = Seq( + toEdge(startTs + 1, "insert", "e", "0", "1", labelName, s"""{"time": 10}"""), + toEdge(startTs + 2, "insert", "e", "0", "1", labelName, s"""{"time": 11}"""), + toEdge(startTs + 3, "insert", "e", "0", "1", labelName, s"""{"time": 12}"""), + toEdge(startTs + 4, "insert", "e", "0", "2", labelName, s"""{"time": 10}"""), + toEdge(startTs + 5, "insert", "e", "10", "20", labelName, s"""{"time": 10}"""), + toEdge(startTs + 6, "insert", "e", "10", "21", labelName, s"""{"time": 11}"""), + toEdge(startTs + 7, "insert", "e", "11", "20", labelName, s"""{"time": 12}"""), + toEdge(startTs + 8, "insert", "e", "12", "20", labelName, s"""{"time": 13}""") + ) + + def query(id: Long, serviceName: String = testServiceName, columnName: String = testColumnName, + _labelName: String = labelName, direction: String = "out") = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "$serviceName", + "columnName": "$columnName", + "id": $id + }], + "steps": [ + [ { + "label": "${_labelName}", + "direction": "${direction}", + "offset": 0, + "limit": -1, + "duplicate": "raw" + } + ]] + }""") + + def getDegree(jsValue: JsValue): Long = { + ((jsValue \ "degrees") \\ "_degree").headOption.map(_.as[Long]).getOrElse(0L) + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala new file mode 100644 index 0000000..b8bfcf6 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala @@ -0,0 +1,71 @@ +package org.apache.s2graph.core.Integrate + +import org.apache.s2graph.core.PostProcess +import play.api.libs.json.{JsValue, Json} + +import scala.concurrent.Await +import scala.util.Random + + +class VertexTestHelper extends IntegrateCommon { + + import TestUtil._ + import VertexTestHelper._ + + test("vertex") { + val ids = (7 until 20).map(tcNum => tcNum * 1000 + 0) + val (serviceName, columnName) = (testServiceName, testColumnName) + + val data = vertexInsertsPayload(serviceName, columnName, ids) + val payload = Json.parse(Json.toJson(data).toString) + println(payload) + + val vertices = parser.toVertices(payload, "insert", Option(serviceName), Option(columnName)) + Await.result(graph.mutateVertices(vertices, withWait = true), HttpRequestWaitingTime) + + val res = graph.getVertices(vertices).map { vertices => + PostProcess.verticesToJson(vertices) + } + + val ret = Await.result(res, HttpRequestWaitingTime) + val fetched = ret.as[Seq[JsValue]] + for { + (d, f) <- data.zip(fetched) + } yield { + (d \ "id") should be(f \ "id") + ((d \ "props") \ "age") should be((f \ "props") \ "age") + } + } + + object VertexTestHelper { + def vertexQueryJson(serviceName: String, columnName: String, ids: Seq[Int]) = { + Json.parse( + s""" + |[ + |{"serviceName": "$serviceName", "columnName": "$columnName", "ids": [${ids.mkString(",")} + ]} + |] + """.stripMargin) + } + + def vertexInsertsPayload(serviceName: String, columnName: String, ids: Seq[Int]): Seq[JsValue] = { + ids.map { id => + Json.obj("id" -> id, "props" -> randomProps, "timestamp" -> System.currentTimeMillis()) + } + } + + val vertexPropsKeys = List( + ("age", "int") + ) + + def randomProps() = { + (for { + (propKey, propType) <- vertexPropsKeys + } yield { + propKey -> Random.nextInt(100) + }).toMap + } + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala new file mode 100644 index 0000000..81c57c6 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala @@ -0,0 +1,136 @@ +package org.apache.s2graph.core.Integrate + +import java.util.concurrent.TimeUnit + +import org.scalatest.BeforeAndAfterEach +import play.api.libs.json.{JsObject, JsValue, Json} + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach { + + import TestUtil._ + import WeakLabelDeleteHelper._ + + test("test weak consistency select") { + var result = getEdgesSync(query(0)) + println(result) + (result \ "results").as[List[JsValue]].size should be(4) + result = getEdgesSync(query(10)) + println(result) + (result \ "results").as[List[JsValue]].size should be(2) + } + + test("test weak consistency delete") { + var result = getEdgesSync(query(0)) + println(result) + + /** expect 4 edges */ + (result \ "results").as[List[JsValue]].size should be(4) + val edges = (result \ "results").as[List[JsObject]] + val edgesToStore = parser.toEdges(Json.toJson(edges), "delete") + val rets = graph.mutateEdges(edgesToStore, withWait = true) + Await.result(rets, Duration(20, TimeUnit.MINUTES)) + + /** expect noting */ + result = getEdgesSync(query(0)) + println(result) + (result \ "results").as[List[JsValue]].size should be(0) + + /** insert should be ignored */ + /** + * I am wondering if this is right test case + * This makes sense because hbase think cell is deleted when there are + * insert/delete with same timestamp(version) on same cell. + * This can be different on different storage system so I think + * this test should be removed. + */ +// val edgesToStore2 = parser.toEdges(Json.toJson(edges), "insert") +// val rets2 = graph.mutateEdges(edgesToStore2, withWait = true) +// Await.result(rets2, Duration(20, TimeUnit.MINUTES)) +// +// result = getEdgesSync(query(0)) +// (result \ "results").as[List[JsValue]].size should be(0) + } + + + test("test weak consistency deleteAll") { + val deletedAt = 100 + var result = getEdgesSync(query(20, "in", testTgtColumnName)) + println(result) + (result \ "results").as[List[JsValue]].size should be(3) + + val json = Json.arr(Json.obj("label" -> testLabelNameWeak, + "direction" -> "in", "ids" -> Json.arr("20"), "timestamp" -> deletedAt)) + println(json) + deleteAllSync(json) + + result = getEdgesSync(query(11, "out")) + (result \ "results").as[List[JsValue]].size should be(0) + + result = getEdgesSync(query(12, "out")) + (result \ "results").as[List[JsValue]].size should be(0) + + result = getEdgesSync(query(10, "out")) + + // 10 -> out -> 20 should not be in result. + (result \ "results").as[List[JsValue]].size should be(1) + (result \\ "to").size should be(1) + (result \\ "to").head.as[String] should be("21") + + result = getEdgesSync(query(20, "in", testTgtColumnName)) + println(result) + (result \ "results").as[List[JsValue]].size should be(0) + + insertEdgesSync(bulkEdges(startTs = deletedAt + 1): _*) + + result = getEdgesSync(query(20, "in", testTgtColumnName)) + (result \ "results").as[List[JsValue]].size should be(3) + } + + + // called by each test, each + override def beforeEach = initTestData() + + // called by start test, once + + override def initTestData(): Unit = { + super.initTestData() + insertEdgesSync(bulkEdges(): _*) + } + + object WeakLabelDeleteHelper { + + def bulkEdges(startTs: Int = 0) = Seq( + toEdge(startTs + 1, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 10}"""), + toEdge(startTs + 2, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 11}"""), + toEdge(startTs + 3, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 12}"""), + toEdge(startTs + 4, "insert", "e", "0", "2", testLabelNameWeak, s"""{"time": 10}"""), + toEdge(startTs + 5, "insert", "e", "10", "20", testLabelNameWeak, s"""{"time": 10}"""), + toEdge(startTs + 6, "insert", "e", "10", "21", testLabelNameWeak, s"""{"time": 11}"""), + toEdge(startTs + 7, "insert", "e", "11", "20", testLabelNameWeak, s"""{"time": 12}"""), + toEdge(startTs + 8, "insert", "e", "12", "20", testLabelNameWeak, s"""{"time": 13}""") + ) + + def query(id: Int, direction: String = "out", columnName: String = testColumnName) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "$testServiceName", + "columnName": "$columnName", + "id": ${id} + }], + "steps": [ + [ { + "label": "${testLabelNameWeak}", + "direction": "${direction}", + "offset": 0, + "limit": 10, + "duplicate": "raw" + } + ]] + }""") + } + +} +
