Repository: incubator-s2graph Updated Branches: refs/heads/master b89567606 -> f74c224ac
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala index 8d95e77..1d4e195 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala @@ -24,9 +24,9 @@ import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} import org.apache.s2graph.core.storage.StorageDeserializable._ import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, StorageDeserializable} import org.apache.s2graph.core.types.TargetVertexId -import org.apache.s2graph.core.{Edge, SnapshotEdge, Vertex} +import org.apache.s2graph.core.{Graph, Edge, SnapshotEdge, Vertex} -class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { +class SnapshotEdgeDeserializable(graph: Graph) extends Deserializable[SnapshotEdge] { def statusCodeWithOp(byte: Byte): (Byte, Byte) = { val statusCode = byte >> 4 @@ -73,7 +73,7 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { val lockTs = Option(Bytes.toLong(kv.value, pos, 8)) val pendingEdge = - Edge(Vertex(srcVertexId, cellVersion), + graph.newEdge(Vertex(srcVertexId, cellVersion), Vertex(tgtVertexId, cellVersion), label, labelWithDir.dir, pendingEdgeOp, cellVersion, pendingEdgeProps.toMap, @@ -84,7 +84,7 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal) } - SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), + graph.newSnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), label, labelWithDir.dir, op, cellVersion, props, statusCode = statusCode, pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal)) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala index 6321fef..3032d9e 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,40 +27,41 @@ import org.scalatest.FunSuite import play.api.libs.json.{JsObject, Json} class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { + import Edge._ initTests() - val testLabelMeta1 = LabelMeta(Option(-1), labelV2.id.get, "test", 1.toByte, "true", "boolean") - val testLabelMeta3 = LabelMeta(Option(-1), labelV2.id.get, "test", 3.toByte, "-1", "long") - - test("toLogString") { - val testServiceName = serviceNameV2 - val testLabelName = labelNameV2 - val bulkQueries = List( - ("1445240543366", "update", "{\"is_blocked\":true}"), - ("1445240543362", "insert", "{\"is_hidden\":false}"), - ("1445240543364", "insert", "{\"is_hidden\":false,\"weight\":10}"), - ("1445240543363", "delete", "{}"), - ("1445240543365", "update", "{\"time\":1, \"weight\":-10}")) - - val (srcId, tgtId, labelName) = ("1", "2", testLabelName) - - val bulkEdge = (for ((ts, op, props) <- bulkQueries) yield { - val properties = fromJsonToProperties(Json.parse(props).as[JsObject]) - Edge.toEdge(srcId, tgtId, labelName, "out", properties, ts.toLong, op).toLogString - }).mkString("\n") - - val attachedProps = "\"from\":\"1\",\"to\":\"2\",\"label\":\"" + testLabelName + - "\",\"service\":\"" + testServiceName + "\"" - val expected = Seq( - Seq("1445240543366", "update", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"is_blocked\":true}"), - Seq("1445240543362", "insert", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"is_hidden\":false}"), - Seq("1445240543364", "insert", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"is_hidden\":false,\"weight\":10}"), - Seq("1445240543363", "delete", "e", "1", "2", testLabelName, "{" + attachedProps + "}"), - Seq("1445240543365", "update", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"time\":1,\"weight\":-10}") - ).map(_.mkString("\t")).mkString("\n") - - assert(bulkEdge === expected) - } + val testLabelMeta1 = LabelMeta(Option(-1), labelV2.id.get, "is_blocked", 1.toByte, "true", "boolean") + val testLabelMeta3 = LabelMeta(Option(-1), labelV2.id.get, "time", 3.toByte, "-1", "long") + +// test("toLogString") { +// val testServiceName = serviceNameV2 +// val testLabelName = labelNameV2 +// val bulkQueries = List( +// ("1445240543366", "update", "{\"is_blocked\":true}"), +// ("1445240543362", "insert", "{\"is_hidden\":false}"), +// ("1445240543364", "insert", "{\"is_hidden\":false,\"weight\":10}"), +// ("1445240543363", "delete", "{}"), +// ("1445240543365", "update", "{\"time\":1, \"weight\":-10}")) +// +// val (srcId, tgtId, labelName) = ("1", "2", testLabelName) +// +// val bulkEdge = (for ((ts, op, props) <- bulkQueries) yield { +// val properties = fromJsonToProperties(Json.parse(props).as[JsObject]) +// Edge.toEdge(srcId, tgtId, labelName, "out", properties, ts.toLong, op).toLogString +// }).mkString("\n") +// +// val attachedProps = "\"from\":\"1\",\"to\":\"2\",\"label\":\"" + testLabelName + +// "\",\"service\":\"" + testServiceName + "\"" +// val expected = Seq( +// Seq("1445240543366", "update", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"is_blocked\":true}"), +// Seq("1445240543362", "insert", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"is_hidden\":false}"), +// Seq("1445240543364", "insert", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"is_hidden\":false,\"weight\":10}"), +// Seq("1445240543363", "delete", "e", "1", "2", testLabelName, "{" + attachedProps + "}"), +// Seq("1445240543365", "update", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"time\":1,\"weight\":-10}") +// ).map(_.mkString("\t")).mkString("\n") +// +// assert(bulkEdge === expected) +// } test("buildOperation") { val schemaVersion = "v2" @@ -72,7 +73,8 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { val snapshotEdge = None val propsWithTs = Map(timestampProp) - val requestEdge = Edge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) + val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) + val newVersion = 0L val newPropsWithTs = Map( @@ -98,7 +100,8 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { val snapshotEdge = None val propsWithTs = Map(timestampProp) - val requestEdge = Edge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) + val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) + val newVersion = 0L val newPropsWithTs = Map( @@ -124,7 +127,8 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { val snapshotEdge = None val propsWithTs = Map(timestampProp) - val requestEdge = Edge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) + val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) + val newVersion = 0L val newPropsWithTs = propsWithTs @@ -155,10 +159,13 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) ) - val snapshotEdge = - Option(Edge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = oldPropsWithTs)) + val _snapshotEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = propsWithTs) + + val snapshotEdge = Option(_snapshotEdge) + + + val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) - val requestEdge = Edge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) val newVersion = 0L val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) logger.info(edgeMutate.toLogString) @@ -186,10 +193,12 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) ) - val snapshotEdge = - Option(Edge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = oldPropsWithTs)) + val _snapshotEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = propsWithTs) + + val snapshotEdge = Option(_snapshotEdge) + + val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) - val requestEdge = Edge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) val newVersion = 0L val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) logger.info(edgeMutate.toLogString) @@ -199,365 +208,3 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { assert(edgeMutate.edgesToDelete.isEmpty) } } - -//import com.kakao.s2graph.core.types._ -//import org.hbase.async.PutRequest -//import org.scalatest.{FunSuite, Matchers} -// -///** -// * Created by shon on 5/29/15. -// */ -//class EdgeTest extends FunSuite with Matchers with TestCommon with TestCommonWithModels { -// -// -// import HBaseType.{VERSION1, VERSION2} -// -// -// def srcVertex(innerVal: InnerValLike)(version: String) = { -// val colId = if (version == VERSION1) column.id.get else columnV2.id.get -// Vertex(SourceVertexId(colId, innerVal), ts) -// } -// -// def tgtVertex(innerVal: InnerValLike)(version: String) = { -// val colId = if (version == VERSION1) column.id.get else columnV2.id.get -// Vertex(TargetVertexId(colId, innerVal), ts) -// } -// -// -// def testEdges(version: String) = { -// val innerVals = if (version == VERSION1) intInnerVals else intInnerValsV2 -// val tgtV = tgtVertex(innerVals.head)(version) -// innerVals.tail.map { intInnerVal => -// val labelWithDirection = if (version == VERSION1) labelWithDir else labelWithDirV2 -// val idxPropsList = if (version == VERSION1) idxPropsLs else idxPropsLsV2 -// idxPropsList.map { idxProps => -// val currentTs = idxProps.toMap.get(0.toByte).get.toString.toLong -// val idxPropsWithTs = idxProps.map { case (k, v) => k -> InnerValLikeWithTs(v, currentTs) } -// -// Edge(srcVertex(intInnerVal)(version), tgtV, labelWithDirection, op, currentTs, currentTs, idxPropsWithTs.toMap) -// } -// } -// } -// -// def testPropsUpdate(oldProps: Map[Byte, InnerValLikeWithTs], -// newProps: Map[Byte, InnerValLikeWithTs], -// expected: Map[Byte, Any], -// expectedShouldUpdate: Boolean) -// (f: PropsPairWithTs => (Map[Byte, InnerValLikeWithTs], Boolean))(version: String) = { -// -// val timestamp = newProps.toList.head._2.ts -// val (updated, shouldUpdate) = f((oldProps, newProps, timestamp, version)) -// val rets = for { -// (k, v) <- expected -// } yield { -// v match { -// case v: String => -// v match { -// case "left" => updated.get(k).isDefined && updated(k) == oldProps(k) -// case "right" => updated.get(k).isDefined && updated(k) == newProps(k) -// case "none" => updated.get(k).isEmpty -// } -// case value: InnerValLikeWithTs => updated.get(k).get == value -// case _ => throw new RuntimeException(s"not supported keyword: $v") -// } -// } -// println(rets) -// rets.forall(x => x) && shouldUpdate == expectedShouldUpdate -// } -// -// def testEdgeWithIndex(edges: Seq[Seq[Edge]])(queryParam: QueryParam) = { -// val rets = for { -// edgeForSameTgtVertex <- edges -// } yield { -// val head = edgeForSameTgtVertex.head -// val start = head -// var prev = head -// val rets = for { -// edge <- edgeForSameTgtVertex.tail -// } yield { -// println(s"prevEdge: $prev") -// println(s"currentEdge: $edge") -// val prevEdgeWithIndex = edge.edgesWithIndex -// val edgesWithIndex = edge.edgesWithIndex -// -// /** test encodeing decoding */ -// for { -// edgeWithIndex <- edge.edgesWithIndex -// put <- edgeWithIndex.buildPutsAsync() -// kv <- putToKeyValues(put.asInstanceOf[PutRequest]) -// } { -// val decoded = Edge.toEdge(kv, queryParam, None, Seq()) -// val comp = decoded.isDefined && decoded.get == edge -// println(s"${decoded.get}") -// println(s"$edge") -// println(s"${decoded.get == edge}") -// comp shouldBe true -// -// /** test order -// * same source, target vertex. same indexProps keys. -// * only difference is indexProps values so comparing qualifier is good enough -// * */ -// for { -// prevEdgeWithIndex <- prev.edgesWithIndex -// } { -// println(edgeWithIndex.qualifier) -// println(prevEdgeWithIndex.qualifier) -// println(edgeWithIndex.qualifier.bytes.toList) -// println(prevEdgeWithIndex.qualifier.bytes.toList) -// /** since index of this test label only use 0, 1 as indexProps -// * if 0, 1 is not different then qualifier bytes should be same -// * */ -// val comp = lessThanEqual(edgeWithIndex.qualifier.bytes, prevEdgeWithIndex.qualifier.bytes) -// comp shouldBe true -// } -// } -// prev = edge -// } -// } -// } -// -// def testInvertedEdge(edges: Seq[Seq[Edge]])(queryParam: QueryParam) = { -// val rets = for { -// edgeForSameTgtVertex <- edges -// } yield { -// val head = edgeForSameTgtVertex.head -// val start = head -// var prev = head -// val rets = for { -// edge <- edgeForSameTgtVertex.tail -// } yield { -// println(s"prevEdge: $prev") -// println(s"currentEdge: $edge") -// val prevEdgeWithIndexInverted = prev.toSnapshotEdge -// val edgeWithInvertedIndex = edge.toSnapshotEdge -// /** test encode decoding */ -// -// val put = edgeWithInvertedIndex.buildPutAsync() -// for { -// kv <- putToKeyValues(put) -// } yield { -// val decoded = Edge.toSnapshotEdge(kv, queryParam, None, isInnerCall = false, Seq()) -// val comp = decoded.isDefined && decoded.get == edge -// println(s"${decoded.get}") -// println(s"$edge") -// println(s"${decoded.get == edge}") -// comp shouldBe true -// -// /** no need to test ordering because qualifier only use targetVertexId */ -// } -// prev = edge -// } -// } -// } -// -// test("insert for edgesWithIndex version 2") { -// val version = VERSION2 -// testEdgeWithIndex(testEdges(version))(queryParamV2) -// } -// test("insert for edgesWithIndex version 1") { -// val version = VERSION1 -// testEdgeWithIndex(testEdges(version))(queryParam) -// } -// -// test("insert for edgeWithInvertedIndex version 1") { -// val version = VERSION1 -// testInvertedEdge(testEdges(version))(queryParam) -// } -// -// test("insert for edgeWithInvertedIndex version 2") { -// val version = VERSION2 -// testInvertedEdge(testEdges(version))(queryParamV2) -// } -// -// -// // /** test cases for each operation */ -// -// def oldProps(timestamp: Long, version: String) = { -// Map( -// labelMeta.lastDeletedAt -> InnerValLikeWithTs.withLong(timestamp - 2, timestamp - 2, version), -// 1.toByte -> InnerValLikeWithTs.withLong(0L, timestamp, version), -// 2.toByte -> InnerValLikeWithTs.withLong(1L, timestamp - 1, version), -// 4.toByte -> InnerValLikeWithTs.withStr("old", timestamp - 1, version) -// ) -// } -// -// def newProps(timestamp: Long, version: String) = { -// Map( -// 2.toByte -> InnerValLikeWithTs.withLong(-10L, timestamp, version), -// 3.toByte -> InnerValLikeWithTs.withLong(20L, timestamp, version) -// ) -// } -// -// def deleteProps(timestamp: Long, version: String) = Map( -// labelMeta.lastDeletedAt -> InnerValLikeWithTs.withLong(timestamp, timestamp, version) -// ) -// -// /** upsert */ -// test("Edge.buildUpsert") { -// val shouldUpdate = true -// val oldState = oldProps(ts, VERSION2) -// val newState = newProps(ts + 1, VERSION2) -// val expected = Map( -// labelMeta.lastDeletedAt -> "left", -// 1.toByte -> "none", -// 2.toByte -> "right", -// 3.toByte -> "right", -// 4.toByte -> "none") -// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildUpsert)(VERSION2) shouldBe true -// } -// test("Edge.buildUpsert shouldUpdate false") { -// val shouldUpdate = false -// val oldState = oldProps(ts, VERSION2) -// val newState = newProps(ts - 10, VERSION2) -// val expected = Map( -// labelMeta.lastDeletedAt -> "left", -// 1.toByte -> "left", -// 2.toByte -> "left", -// 3.toByte -> "none", -// 4.toByte -> "left") -// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildUpsert)(VERSION2) shouldBe true -// } -// -// /** update */ -// test("Edge.buildUpdate") { -// val shouldUpdate = true -// val oldState = oldProps(ts, VERSION2) -// val newState = newProps(ts + 1, VERSION2) -// val expected = Map( -// labelMeta.lastDeletedAt -> "left", -// 1.toByte -> "left", -// 2.toByte -> "right", -// 3.toByte -> "right", -// 4.toByte -> "left" -// ) -// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildUpdate)(VERSION2) shouldBe true -// } -// test("Edge.buildUpdate shouldUpdate false") { -// val shouldUpdate = false -// val oldState = oldProps(ts, VERSION2) -// val newState = newProps(ts - 10, VERSION2) -// val expected = Map( -// labelMeta.lastDeletedAt -> "left", -// 1.toByte -> "left", -// 2.toByte -> "left", -// 3.toByte -> "none", -// 4.toByte -> "left" -// ) -// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildUpdate)(VERSION2) shouldBe true -// } -// -// /** delete */ -// test("Edge.buildDelete") { -// val shouldUpdate = true -// val oldState = oldProps(ts, VERSION2) -// val newState = deleteProps(ts + 1, VERSION2) -// val expected = Map( -// labelMeta.lastDeletedAt -> "right", -// 1.toByte -> "none", -// 2.toByte -> "none", -// 4.toByte -> "none" -// ) -// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildDelete)(VERSION2) shouldBe true -// } -// test("Edge.buildDelete shouldUpdate false") { -// val shouldUpdate = false -// val oldState = oldProps(ts, VERSION2) -// val newState = deleteProps(ts - 10, VERSION2) -// val expected = Map( -// labelMeta.lastDeletedAt -> "left", -// 1.toByte -> "left", -// 2.toByte -> "left", -// 4.toByte -> "left" -// ) -// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildDelete)(VERSION2) shouldBe true -// } -// -// /** increment */ -// test("Edge.buildIncrement") { -// val shouldUpdate = true -// val oldState = oldProps(ts, VERSION2).filterNot(kv => kv._1 == 4.toByte) -// val newState = newProps(ts + 1, VERSION2) -// val expected = Map( -// labelMeta.lastDeletedAt -> "left", -// 1.toByte -> "left", -// 2.toByte -> InnerValLikeWithTs.withLong(-9L, ts - 1, VERSION2), -// 3.toByte -> "right" -// ) -// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildIncrement)(VERSION2) shouldBe true -// } -// test("Edge.buildIncrement shouldRepalce false") { -// val shouldUpdate = false -// val oldState = oldProps(ts, VERSION2).filterNot(kv => kv._1 == 4.toByte) -// val newState = newProps(ts - 10, VERSION2) -// val expected = Map( -// labelMeta.lastDeletedAt -> "left", -// 1.toByte -> "left", -// 2.toByte -> "left" -// ) -// testPropsUpdate(oldState, newState, expected, shouldUpdate)(Edge.buildIncrement)(VERSION2) shouldBe true -// } -// -// test("Edge`s srcVertex") { -// -// val version = VERSION2 -// val srcId = InnerVal.withLong(10, version) -// val tgtId = InnerVal.withStr("abc", version) -// val srcColumn = columnV2 -// val tgtColumn = tgtColumnV2 -// val srcVertexId = VertexId(srcColumn.id.get, srcId) -// val tgtVertexId = VertexId(tgtColumn.id.get, tgtId) -// -// val srcVertex = Vertex(srcVertexId) -// val tgtVertex = Vertex(tgtVertexId) -// -// val labelId = undirectedLabelV2.id.get -// -// val outDir = LabelWithDirection(labelId, GraphUtil.directions("out")) -// val inDir = LabelWithDirection(labelId, GraphUtil.directions("in")) -// val bothDir = LabelWithDirection(labelId, GraphUtil.directions("undirected")) -// -// val op = GraphUtil.operations("insert") -// -// -// val bothEdge = Edge(srcVertex, tgtVertex, bothDir) -// println(s"edge: $bothEdge") -// bothEdge.relatedEdges.foreach { edge => -// println(edge) -// } -// -// } -// test("edge buildIncrementBulk") { -// -// /** -// * 172567371 List(97, 74, 2, 117, -74, -44, -76, 0, 0, 4, 8, 2) -//169116518 List(68, -110, 2, 117, -21, 124, -103, 0, 0, 4, 9, 2) -//11646834 List(17, 33, 2, 127, 78, 72, -115, 0, 0, 4, 9, 2) -//148171217 List(62, 54, 2, 119, 43, 22, 46, 0, 0, 4, 9, 2) -//116315188 List(41, 86, 2, 121, 17, 43, -53, 0, 0, 4, 9, 2) -//180667876 List(48, -82, 2, 117, 59, 58, 27, 0, 0, 4, 8, 2) -//4594410 List(82, 29, 2, 127, -71, -27, 21, 0, 0, 4, 8, 2) -//151435444 List(1, 105, 2, 118, -7, 71, 75, 0, 0, 4, 8, 2) -//168460895 List(67, -35, 2, 117, -11, 125, -96, 0, 0, 4, 9, 2) -//7941614 List(115, 67, 2, 127, -122, -46, 17, 0, 0, 4, 8, 2) -//171169732 List(61, -42, 2, 117, -52, 40, 59, 0, 0, 4, 9, 2) -//174381375 List(91, 2, 2, 117, -101, 38, -64, 0, 0, 4, 9, 2) -//12754019 List(9, -80, 2, 127, 61, 99, -100, 0, 0, 4, 9, 2) -//175518092 List(111, 32, 2, 117, -119, -50, 115, 0, 0, 4, 8, 2) -//174748531 List(28, -81, 2, 117, -107, -116, -116, 0, 0, 4, 8, 2) -// -// */ -// // val incrementsOpt = Edge.buildIncrementDegreeBulk("169116518", "talk_friend_long_term_agg_test", "out", 10) -// // -// // for { -// // increments <- incrementsOpt -// // increment <- increments -// // (cf, qs) <- increment.getFamilyMapOfLongs -// // (q, v) <- qs -// // } { -// // println(increment.getRow.toList) -// // println(q.toList) -// // println(v) -// // } -// //List(68, -110, -29, -4, 116, -24, 124, -37, 0, 0, 52, -44, 2) -// } -//} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala index c84ad6e..cc08b62 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala @@ -79,6 +79,8 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { Label.findByName(labelName, useCache = false) match { case None => val json = Json.parse(create) + logger.info(s">> Create Label") + logger.info(create) val tryRes = for { labelArgs <- parser.toLabelElements(json) label <- (management.createLabel _).tupled(labelArgs) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala index dc5dc2e..5236415 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala @@ -83,7 +83,7 @@ class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach { val json = Json.arr(Json.obj("label" -> testLabelNameWeak, "direction" -> "in", "ids" -> Json.arr("20"), "timestamp" -> deletedAt)) - println(json) + deleteAllSync(json) result = getEdgesSync(query(11, "out")) @@ -99,8 +99,9 @@ class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach { (result \\ "to").size should be(1) (result \\ "to").head.as[String] should be("21") + println("\n" * 10) result = getEdgesSync(query(20, "in", testTgtColumnName)) - println(result) + (result \ "results").as[List[JsValue]].size should be(0) insertEdgesSync(bulkEdges(startTs = deletedAt + 1): _*) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala index d70a08b..9576af2 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -81,7 +81,8 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { /** test for each version */ val js = Json.obj("is_hidden" -> true, "is_blocked" -> false, "weight" -> 10, "time" -> 3, "phone_number" -> "1234") val propsInner = Management.toProps(label, js.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs - val edge = Edge(srcVertex, tgtVertex, label, dir, 0.toByte, ts, propsInner) + val edge = graph.newEdge(srcVertex, tgtVertex, label, dir, 0.toByte, ts, propsInner) + val f = validate(label)(edge) _ /** labelName label is long-long relation */ @@ -100,19 +101,22 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { /** test for each version */ var js = Json.obj("phone_number" -> "") var propsInner = Management.toProps(label, js.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs - var edge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, propsInner) + var edge = graph.newEdge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, propsInner) + var f = validate(label)(edge) _ f(s"phone_number = '' ")(true) js = Json.obj("phone_number" -> "010 3167 1897") propsInner = Management.toProps(label, js.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs - edge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, propsInner) + edge = graph.newEdge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, propsInner) + f = validate(label)(edge) _ f(s"phone_number = '010 3167 1897' ")(true) js = Json.obj("phone_number" -> "010' 3167 1897") propsInner = Management.toProps(label, js.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs - edge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, propsInner) + edge = graph.newEdge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, propsInner) + f = validate(label)(edge) _ f(s"phone_number = '010\\' 3167 1897' ")(true) } @@ -125,7 +129,7 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { /** test for each version */ val js = Json.obj("is_hidden" -> true, "is_blocked" -> false, "weight" -> 10, "time" -> 3, "name" -> "abc") val propsInner = Management.toProps(label, js.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs - val edge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, propsInner) + val edge = graph.newEdge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, propsInner) val f = validate(label)(edge) _ @@ -155,7 +159,8 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { /** test for each version */ val js = Json.obj("is_hidden" -> true, "is_blocked" -> false, "weight" -> 10, "time" -> 3, "name" -> "abc") val propsInner = Management.toProps(label, js.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs - val edge = Edge(srcVertex, tgtVertex, label, dir, 0.toByte, ts, propsInner) + val edge = graph.newEdge(srcVertex, tgtVertex, label, dir, 0.toByte, ts, propsInner) + val f = validate(label)(edge) _ f(s"_from = -1 or _to = ${tgtVertex.innerId.value}")(true) @@ -178,12 +183,14 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { val propsInner = Management.toProps(label, js.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs val parentPropsInner = Management.toProps(label, parentJs.fields).map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs - val grandParentEdge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, parentPropsInner) - val parentEdge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, parentPropsInner, + val grandParentEdge = graph.newEdge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, parentPropsInner) + + val parentEdge = graph.newEdge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, parentPropsInner, parentEdges = Seq(EdgeWithScore(grandParentEdge, 1.0, grandParentEdge.innerLabel))) - val edge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, propsInner, - parentEdges = Seq(EdgeWithScore(parentEdge, 1.0, grandParentEdge.innerLabel))) + val edge = graph.newEdge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, propsInner, + parentEdges = Seq(EdgeWithScore(parentEdge, 1.0, grandParentEdge.innerLabel))) + println(edge.toString) println(parentEdge.toString) println(grandParentEdge.toString) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala index 8e80fd9..acbc689 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala @@ -1,103 +1,103 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.storage.hbase - -import org.apache.s2graph.core.mysqls.{Model, Label, LabelIndex, LabelMeta} -import org.apache.s2graph.core.types._ -import org.apache.s2graph.core.{QueryParam, IndexEdge, TestCommonWithModels, Vertex} -import org.scalatest.{FunSuite, Matchers} - - -class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { - initTests() - - val testLabelMeta = LabelMeta(Option(-1), labelV2.id.get, "test", 1.toByte, "0.0", "double") - /** - * check if storage serializer/deserializer can translate from/to bytes array. - * @param l: label for edge. - * @param ts: timestamp for edge. - * @param to: to VertexId for edge. - * @param props: expected props of edge. - */ - def check(l: Label, ts: Long, to: InnerValLike, props: Map[LabelMeta, InnerValLikeWithTs]): Unit = { - val from = InnerVal.withLong(1, l.schemaVersion) - val vertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, from) - val tgtVertexId = TargetVertexId(HBaseType.DEFAULT_COL_ID, to) - val vertex = Vertex(vertexId, ts) - val tgtVertex = Vertex(tgtVertexId, ts) - val labelWithDir = LabelWithDirection(l.id.get, 0) - val labelOpt = Option(l) - - val indexEdge = IndexEdge(vertex, tgtVertex, l, labelWithDir.dir, 0, ts, LabelIndex.DefaultSeq, props, tsInnerValOpt = Option(InnerVal.withLong(ts, l.schemaVersion))) - val _indexEdgeOpt = graph.getStorage(l).indexEdgeDeserializer(l.schemaVersion).fromKeyValues(labelOpt, - graph.getStorage(l).indexEdgeSerializer(indexEdge).toKeyValues, l.schemaVersion, None) - - - _indexEdgeOpt should not be empty - indexEdge should be(_indexEdgeOpt.get) - } - - - /** note that props have to be properly set up for equals */ - test("test serializer/deserializer for index edge.") { - val ts = System.currentTimeMillis() - for { - l <- Seq(label, labelV2, labelV3, labelV4) - } { - val to = InnerVal.withLong(101, l.schemaVersion) - val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, l.schemaVersion) - val props = Map(LabelMeta.timestamp -> tsInnerValWithTs, - testLabelMeta -> InnerValLikeWithTs.withDouble(2.1, ts, l.schemaVersion)) - - check(l, ts, to, props) - } - } - - test("test serializer/deserializer for degree edge.") { - val ts = System.currentTimeMillis() - for { - l <- Seq(label, labelV2, labelV3, labelV4) - } { - val to = InnerVal.withStr("0", l.schemaVersion) - val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, l.schemaVersion) - val props = Map( - LabelMeta.degree -> InnerValLikeWithTs.withLong(10, ts, l.schemaVersion), - LabelMeta.timestamp -> tsInnerValWithTs) - - check(l, ts, to, props) - } - } - - test("test serializer/deserializer for incrementCount index edge.") { - val ts = System.currentTimeMillis() - for { - l <- Seq(label, labelV2, labelV3, labelV4) - } { - val to = InnerVal.withLong(101, l.schemaVersion) - val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, l.schemaVersion) - val props = Map(LabelMeta.timestamp -> tsInnerValWithTs, - testLabelMeta -> InnerValLikeWithTs.withDouble(2.1, ts, l.schemaVersion), - LabelMeta.count -> InnerValLikeWithTs.withLong(10, ts, l.schemaVersion)) - - - check(l, ts, to, props) - } - } -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// */ +// +//package org.apache.s2graph.core.storage.hbase +// +//import org.apache.s2graph.core.mysqls.{Model, Label, LabelIndex, LabelMeta} +//import org.apache.s2graph.core.types._ +//import org.apache.s2graph.core.{QueryParam, IndexEdge, TestCommonWithModels, Vertex} +//import org.scalatest.{FunSuite, Matchers} +// +// +//class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { +// initTests() +// +// val testLabelMeta = LabelMeta(Option(-1), labelV2.id.get, "test", 1.toByte, "0.0", "double") +// /** +// * check if storage serializer/deserializer can translate from/to bytes array. +// * @param l: label for edge. +// * @param ts: timestamp for edge. +// * @param to: to VertexId for edge. +// * @param props: expected props of edge. +// */ +// def check(l: Label, ts: Long, to: InnerValLike, props: Map[LabelMeta, InnerValLikeWithTs]): Unit = { +// val from = InnerVal.withLong(1, l.schemaVersion) +// val vertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, from) +// val tgtVertexId = TargetVertexId(HBaseType.DEFAULT_COL_ID, to) +// val vertex = Vertex(vertexId, ts) +// val tgtVertex = Vertex(tgtVertexId, ts) +// val labelWithDir = LabelWithDirection(l.id.get, 0) +// val labelOpt = Option(l) +// val edge = graph.newEdge(vertex, tgtVertex, l, labelWithDir.dir, 0, ts, props, tsInnerValOpt = Option(InnerVal.withLong(ts, l.schemaVersion))) +// val indexEdge = edge.edgesWithIndex.find(_.labelIndexSeq == LabelIndex.DefaultSeq).head +// val _indexEdgeOpt = graph.getStorage(l).indexEdgeDeserializer(l.schemaVersion).fromKeyValues(labelOpt, +// graph.getStorage(l).indexEdgeSerializer(indexEdge).toKeyValues, l.schemaVersion, None) +// +// +// _indexEdgeOpt should not be empty +// indexEdge should be(_indexEdgeOpt.get) +// } +// +// +// /** note that props have to be properly set up for equals */ +// test("test serializer/deserializer for index edge.") { +// val ts = System.currentTimeMillis() +// for { +// l <- Seq(label, labelV2, labelV3, labelV4) +// } { +// val to = InnerVal.withLong(101, l.schemaVersion) +// val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, l.schemaVersion) +// val props = Map(LabelMeta.timestamp -> tsInnerValWithTs, +// testLabelMeta -> InnerValLikeWithTs.withDouble(2.1, ts, l.schemaVersion)) +// +// check(l, ts, to, props) +// } +// } +// +// test("test serializer/deserializer for degree edge.") { +// val ts = System.currentTimeMillis() +// for { +// l <- Seq(label, labelV2, labelV3, labelV4) +// } { +// val to = InnerVal.withStr("0", l.schemaVersion) +// val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, l.schemaVersion) +// val props = Map( +// LabelMeta.degree -> InnerValLikeWithTs.withLong(10, ts, l.schemaVersion), +// LabelMeta.timestamp -> tsInnerValWithTs) +// +// check(l, ts, to, props) +// } +// } +// +// test("test serializer/deserializer for incrementCount index edge.") { +// val ts = System.currentTimeMillis() +// for { +// l <- Seq(label, labelV2, labelV3, labelV4) +// } { +// val to = InnerVal.withLong(101, l.schemaVersion) +// val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, l.schemaVersion) +// val props = Map(LabelMeta.timestamp -> tsInnerValWithTs, +// testLabelMeta -> InnerValLikeWithTs.withDouble(2.1, ts, l.schemaVersion), +// LabelMeta.count -> InnerValLikeWithTs.withLong(10, ts, l.schemaVersion)) +// +// +// check(l, ts, to, props) +// } +// } +//} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala index cca3a59..a659ed3 100644 --- a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala +++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala @@ -32,10 +32,11 @@ object CounterEtlFunctions extends Logging { lazy val preFetchSize = StreamingConfig.PROFILE_PREFETCH_SIZE lazy val config = S2ConfigFactory.config lazy val counterModel = new CounterModel(config) + lazy val graph = new Graph(config)(scala.concurrent.ExecutionContext.Implicits.global) def logToEdge(line: String): Option[Edge] = { for { - elem <- Graph.toGraphElement(line) if elem.isInstanceOf[Edge] + elem <- graph.toGraphElement(line) if elem.isInstanceOf[Edge] edge <- Some(elem.asInstanceOf[Edge]).filter { x => filterOps.contains(x.op) } @@ -49,7 +50,7 @@ object CounterEtlFunctions extends Logging { * 1427082276804 insert edge 19073318 52453027_93524145648511699 story_user_ch_doc_view {"doc_type" : "l", "channel_subscribing" : "y", "view_from" : "feed"} */ for { - elem <- Graph.toGraphElement(line) if elem.isInstanceOf[Edge] + elem <- graph.toGraphElement(line) if elem.isInstanceOf[Edge] edge <- Some(elem.asInstanceOf[Edge]).filter { x => filterOps.contains(x.op) }
