Repository: incubator-s2graph Updated Branches: refs/heads/master f7154bac9 -> b89567606
[S2GRAPH-129]: Restrict direct access on Edge's properties from other classes. - add tp3 as dependencies. - make propsWithTs as private. Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/292174ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/292174ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/292174ec Branch: refs/heads/master Commit: 292174ecf8da32604d61cb5f8388b8d9eeb54be3 Parents: f7154ba Author: DO YUNG YOON <[email protected]> Authored: Thu Nov 24 10:46:27 2016 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Thu Nov 24 10:46:27 2016 +0900 ---------------------------------------------------------------------- project/Common.scala | 1 + s2core/build.sbt | 2 +- .../scala/org/apache/s2graph/core/Edge.scala | 172 ++++++++++++++----- .../scala/org/apache/s2graph/core/Graph.scala | 65 +++---- .../org/apache/s2graph/core/PostProcess.scala | 11 +- .../org/apache/s2graph/core/QueryParam.scala | 8 +- .../org/apache/s2graph/core/QueryResult.scala | 9 +- .../org/apache/s2graph/core/S2Property.scala | 29 ++++ .../apache/s2graph/core/S2VertexProperty.scala | 28 +++ .../apache/s2graph/core/mysqls/ColumnMeta.scala | 1 + .../org/apache/s2graph/core/mysqls/Label.scala | 2 +- .../apache/s2graph/core/mysqls/LabelMeta.scala | 1 + .../s2graph/core/parsers/WhereParser.scala | 9 +- .../apache/s2graph/core/storage/Storage.scala | 30 ++-- .../core/storage/hbase/AsynchbaseStorage.scala | 5 +- .../indexedge/tall/IndexEdgeSerializable.scala | 4 +- .../indexedge/wide/IndexEdgeSerializable.scala | 4 +- .../tall/SnapshotEdgeSerializable.scala | 5 +- .../wide/SnapshotEdgeSerializable.scala | 5 +- .../s2graph/core/types/InnerValLike.scala | 88 ++++++++++ .../s2graph/core/Integrate/QueryTest.scala | 2 + .../s2graph/core/parsers/WhereParserTest.scala | 4 +- .../loader/core/CounterEtlFunctions.scala | 2 +- .../rest/play/controllers/EdgeController.scala | 4 +- 24 files changed, 347 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/project/Common.scala ---------------------------------------------------------------------- diff --git a/project/Common.scala b/project/Common.scala index f3dfc68..036d5c9 100644 --- a/project/Common.scala +++ b/project/Common.scala @@ -26,6 +26,7 @@ object Common { val hbaseVersion = "1.2.2" val hadoopVersion = "2.7.3" + val tinkerpopVersion = "3.2.3" /** use Log4j 1.2.17 as the SLF4j backend in runtime, with bridging libraries to forward JCL and JUL logs to SLF4j */ val loggingRuntime = Seq( http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/build.sbt ---------------------------------------------------------------------- diff --git a/s2core/build.sbt b/s2core/build.sbt index 80f37b0..6434acc 100644 --- a/s2core/build.sbt +++ b/s2core/build.sbt @@ -42,7 +42,7 @@ libraryDependencies ++= Seq( "io.netty" % "netty" % "3.9.4.Final" force(), "org.hbase" % "asynchbase" % "1.7.2" excludeLogging(), "net.bytebuddy" % "byte-buddy" % "1.4.26", - + "org.apache.tinkerpop" % "gremlin-core" % tinkerpopVersion, "org.scalatest" %% "scalatest" % "2.2.4" % "test", "org.specs2" %% "specs2-core" % specs2Version % "test" ) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala index 8a2784d..b27a05e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala @@ -19,14 +19,17 @@ package org.apache.s2graph.core +import java.util + import org.apache.s2graph.core.GraphExceptions.LabelNotExistException import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} import org.apache.s2graph.core.types._ import org.apache.s2graph.core.utils.logger +import org.apache.tinkerpop.gremlin.structure import play.api.libs.json.{JsNumber, JsObject, Json} import scala.util.hashing.MurmurHash3 - +import org.apache.tinkerpop.gremlin.structure.{Edge => TpEdge, Direction, Property, Graph => TpGraph} case class SnapshotEdge(srcVertex: Vertex, tgtVertex: Vertex, @@ -34,29 +37,33 @@ case class SnapshotEdge(srcVertex: Vertex, direction: Int, op: Byte, version: Long, - props: Map[LabelMeta, InnerValLikeWithTs], + private val propsWithTs: Map[LabelMeta, InnerValLikeWithTs], pendingEdgeOpt: Option[Edge], statusCode: Byte = 0, lockTs: Option[Long], tsInnerValOpt: Option[InnerValLike] = None) { lazy val labelWithDir = LabelWithDirection(label.id.get, direction) - if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.") + if (!propsWithTs.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.") // val label = Label.findById(labelWithDir.labelId) lazy val schemaVer = label.schemaVersion - lazy val propsWithoutTs = props.mapValues(_.innerVal) - lazy val ts = props(LabelMeta.timestamp).innerVal.toString().toLong + lazy val propsWithoutTs = propsWithTs.mapValues(_.innerVal) + lazy val ts = propsWithTs(LabelMeta.timestamp).innerVal.toString().toLong + + def propsToKeyValuesWithTs = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.map(kv => kv._1.seq -> kv._2).toSeq) + + def allPropsDeleted = Edge.allPropsDeleted(propsWithTs) def toEdge: Edge = { - val ts = props.get(LabelMeta.timestamp).map(v => v.ts).getOrElse(version) + val ts = propsWithTs.get(LabelMeta.timestamp).map(v => v.ts).getOrElse(version) Edge(srcVertex, tgtVertex, label, direction, op, - version, props, pendingEdgeOpt = pendingEdgeOpt, + version, propsWithTs, pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt) } def propsWithName = (for { - (meta, v) <- props + (meta, v) <- propsWithTs jsValue <- innerValToJsValue(v.innerVal, meta.dataType) } yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version)) @@ -73,7 +80,7 @@ case class IndexEdge(srcVertex: Vertex, op: Byte, version: Long, labelIndexSeq: Byte, - props: Map[LabelMeta, InnerValLikeWithTs], + private val propsWithTs: Map[LabelMeta, InnerValLikeWithTs], tsInnerValOpt: Option[InnerValLike] = None) { // if (!props.contains(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") // assert(props.contains(LabelMeta.timeStampSeq)) @@ -82,8 +89,8 @@ case class IndexEdge(srcVertex: Vertex, lazy val isInEdge = labelWithDir.dir == GraphUtil.directions("in") lazy val isOutEdge = !isInEdge - lazy val ts = props(LabelMeta.timestamp).innerVal.toString.toLong - lazy val degreeEdge = props.contains(LabelMeta.degree) + lazy val ts = propsWithTs(LabelMeta.timestamp).innerVal.toString.toLong + lazy val degreeEdge = propsWithTs.contains(LabelMeta.degree) lazy val schemaVer = label.schemaVersion lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get @@ -96,7 +103,7 @@ case class IndexEdge(srcVertex: Vertex, /** TODO: make sure call of this class fill props as this assumes */ lazy val orders = for (meta <- labelIndexMetaSeqs) yield { - props.get(meta) match { + propsWithTs.get(meta) match { case None => /** @@ -118,7 +125,7 @@ case class IndexEdge(srcVertex: Vertex, } lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet - lazy val metas = for ((meta, v) <- props if !ordersKeyMap.contains(meta)) yield meta -> v.innerVal + lazy val metas = for ((meta, v) <- propsWithTs if !ordersKeyMap.contains(meta)) yield meta -> v.innerVal // lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) } @@ -128,52 +135,111 @@ case class IndexEdge(srcVertex: Vertex, lazy val hasAllPropsForIndex = orders.length == labelIndexMetaSeqs.length def propsWithName = for { - (meta, v) <- props if meta.seq >= 0 + (meta, v) <- propsWithTs if meta.seq >= 0 jsValue <- innerValToJsValue(v.innerVal, meta.dataType) } yield meta.name -> jsValue - def toEdge: Edge = Edge(srcVertex, tgtVertex, label, direction, op, version, props, tsInnerValOpt = tsInnerValOpt) + def toEdge: Edge = Edge(srcVertex, tgtVertex, label, direction, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt) // only for debug def toLogString() = { List(version, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, Json.toJson(propsWithName)).mkString("\t") } + + def property(key: String): Option[InnerValLikeWithTs] = { + label.metaPropsInvMap.get(key).map(labelMeta => property(labelMeta)) + } + + def property(labelMeta: LabelMeta): InnerValLikeWithTs = { + propsWithTs.get(labelMeta).getOrElse(label.metaPropsDefaultMapInner(labelMeta)) + } + + def updatePropsWithTs(others: Map[LabelMeta, InnerValLikeWithTs] = Map.empty): Map[LabelMeta, InnerValLikeWithTs] = { + if (others.isEmpty) propsWithTs + else propsWithTs ++ others + } } case class Edge(srcVertex: Vertex, tgtVertex: Vertex, - label: Label, + innerLabel: Label, dir: Int, op: Byte = GraphUtil.defaultOpByte, version: Long = System.currentTimeMillis(), - propsWithTs: Map[LabelMeta, InnerValLikeWithTs], + private val propsWithTs: Map[LabelMeta, InnerValLikeWithTs], parentEdges: Seq[EdgeWithScore] = Nil, originalEdgeOpt: Option[Edge] = None, pendingEdgeOpt: Option[Edge] = None, statusCode: Byte = 0, lockTs: Option[Long] = None, - tsInnerValOpt: Option[InnerValLike] = None) extends GraphElement { + tsInnerValOpt: Option[InnerValLike] = None) extends GraphElement with TpEdge { - lazy val labelWithDir = LabelWithDirection(label.id.get, dir) -// if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.") - // assert(propsWithTs.contains(LabelMeta.timeStampSeq)) - lazy val schemaVer = label.schemaVersion + lazy val labelWithDir = LabelWithDirection(innerLabel.id.get, dir) + lazy val schemaVer = innerLabel.schemaVersion lazy val ts = propsWithTs(LabelMeta.timestamp).innerVal.value match { case b: BigDecimal => b.longValue() case l: Long => l case i: Int => i.toLong case _ => throw new RuntimeException("ts should be in [BigDecimal/Long/Int].") } + //FIXME lazy val tsInnerVal = tsInnerValOpt.get.value -// propsWithTs(LabelMeta.timestamp).innerVal.value - -// lazy val label = Label.findById(labelWithDir.labelId) lazy val srcId = srcVertex.innerIdVal lazy val tgtId = tgtVertex.innerIdVal - lazy val labelName = label.label + lazy val labelName = innerLabel.label lazy val direction = GraphUtil.fromDirection(dir) + + def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs) + + def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.map(kv => kv._1.seq -> kv._2).toSeq) + + def updatePropsWithTs(others: Map[LabelMeta, InnerValLikeWithTs] = Map.empty): Map[LabelMeta, InnerValLikeWithTs] = { + if (others.isEmpty) propsWithTs + else propsWithTs ++ others + } + + def propertyValue(key: String): Option[InnerValLikeWithTs] = { + key match { + case "from" | "_from" => Option(InnerValLikeWithTs(srcVertex.innerId, ts)) + case "to" | "_to" => Option(InnerValLikeWithTs(tgtVertex.innerId, ts)) + case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(innerLabel.label, schemaVer), ts)) + case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(direction, schemaVer), ts)) + case _ => + innerLabel.metaPropsInvMap.get(key).map(labelMeta => propertyValueInner(labelMeta)) + } + } + + def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs= { + propsWithTs.getOrElse(labelMeta, innerLabel.metaPropsDefaultMapInner(labelMeta)) + } + + def propertyValues(keys: Seq[String] = Nil): Map[LabelMeta, InnerValLikeWithTs] = { + val labelMetas = for { + key <- keys + labelMeta <- innerLabel.metaPropsInvMap.get(key) + } yield labelMeta + + propertyValuesInner(labelMetas) + } + + def propertyValuesInner(labelMetas: Seq[LabelMeta] = Nil): Map[LabelMeta, InnerValLikeWithTs] = { + if (labelMetas.isEmpty) { + innerLabel.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) => + labelMeta -> propertyValueInner(labelMeta) + } + } else { + // This is important since timestamp is required for all edges. + (LabelMeta.timestamp +: labelMetas).map { labelMeta => + labelMeta -> propertyValueInner(labelMeta) + }.toMap + } + } + +// if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.") + // assert(propsWithTs.contains(LabelMeta.timeStampSeq)) + lazy val properties = toProps() def props = propsWithTs.mapValues(_.innerVal) @@ -181,7 +247,7 @@ case class Edge(srcVertex: Vertex, private def toProps(): Map[String, Any] = { for { - (labelMeta, defaultVal) <- label.metaPropsDefaultMapInner + (labelMeta, defaultVal) <- innerLabel.metaPropsDefaultMapInner } yield { labelMeta.name -> propsWithTs.getOrElse(labelMeta, defaultVal).innerVal.value } @@ -189,7 +255,7 @@ case class Edge(srcVertex: Vertex, def relatedEdges = { if (labelWithDir.isDirected) { - val skipReverse = label.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false) + val skipReverse = innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false) if (skipReverse) List(this) else List(this, duplicateEdge) } else { // val outDir = labelWithDir.copy(dir = GraphUtil.directions("out")) @@ -204,18 +270,18 @@ case class Edge(srcVertex: Vertex, def srcForVertex = { val belongLabelIds = Seq(labelWithDir.labelId) if (labelWithDir.dir == GraphUtil.directions("in")) { - Vertex(VertexId(label.tgtColumn.id.get, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds) + Vertex(VertexId(innerLabel.tgtColumn.id.get, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds) } else { - Vertex(VertexId(label.srcColumn.id.get, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds) + Vertex(VertexId(innerLabel.srcColumn.id.get, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds) } } def tgtForVertex = { val belongLabelIds = Seq(labelWithDir.labelId) if (labelWithDir.dir == GraphUtil.directions("in")) { - Vertex(VertexId(label.srcColumn.id.get, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds) + Vertex(VertexId(innerLabel.srcColumn.id.get, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds) } else { - Vertex(VertexId(label.tgtColumn.id.get, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds) + Vertex(VertexId(innerLabel.tgtColumn.id.get, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds) } } @@ -228,13 +294,13 @@ case class Edge(srcVertex: Vertex, def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId) - override def serviceName = label.serviceName + override def serviceName = innerLabel.serviceName override def queueKey = Seq(ts.toString, tgtVertex.serviceName).mkString("|") override def queuePartitionKey = Seq(srcVertex.innerId, tgtVertex.innerId).mkString("|") - override def isAsync = label.isAsync + override def isAsync = innerLabel.isAsync def isDegree = propsWithTs.contains(LabelMeta.degree) @@ -246,11 +312,11 @@ case class Edge(srcVertex: Vertex, def propsPlusTsValid = propsWithTs.filter(kv => LabelMeta.isValidSeq(kv._1.seq)) def edgesWithIndex = for (labelOrder <- labelOrders) yield { - IndexEdge(srcVertex, tgtVertex, label, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt) + IndexEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt) } def edgesWithIndexValid = for (labelOrder <- labelOrders) yield { - IndexEdge(srcVertex, tgtVertex, label, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt) + IndexEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt) } /** force direction as out on invertedEdge */ @@ -259,7 +325,7 @@ case class Edge(srcVertex: Vertex, // val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out")) - val ret = SnapshotEdge(smaller, larger, label, GraphUtil.directions("out"), op, version, + val ret = SnapshotEdge(smaller, larger, innerLabel, GraphUtil.directions("out"), op, version, Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(ts, schemaVer), ts)) ++ propsWithTs, pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt) ret @@ -278,7 +344,7 @@ case class Edge(srcVertex: Vertex, } def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), "to" -> tgtVertex.innerId.toString(), - "label" -> label.label, "service" -> label.serviceName) + "label" -> innerLabel.label, "service" -> innerLabel.serviceName) def propsWithName = for { @@ -290,7 +356,7 @@ case class Edge(srcVertex: Vertex, def updateTgtVertex(id: InnerValLike) = { val newId = TargetVertexId(tgtVertex.id.colId, id) val newTgtVertex = Vertex(newId, tgtVertex.ts, tgtVertex.props) - Edge(srcVertex, newTgtVertex, label, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt) + Edge(srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt) } def rank(r: RankParam): Double = @@ -316,8 +382,28 @@ case class Edge(srcVertex: Vertex, def toLogString: String = { val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj()) - List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, allPropsWithName).mkString("\t") + List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, allPropsWithName).mkString("\t") } + + override def vertices(direction: Direction): util.Iterator[structure.Vertex] = ??? + + override def properties[V](strings: String*): util.Iterator[Property[V]] = ??? + + override def property[V](key: String): Property[V] = ??? + + override def property[V](key: String, value: V): Property[V] = { + property(key, value, System.currentTimeMillis()) + } + + def property[V](key: String, value: V, ts: Long): Property[V] = ??? + + override def remove(): Unit = ??? + + override def graph(): TpGraph = ??? + + override def id(): AnyRef = ??? + + override def label(): String = innerLabel.label } @@ -399,14 +485,14 @@ object Edge { val funcs = requestEdges.map { edge => if (edge.op == GraphUtil.operations("insert")) { - edge.label.consistencyLevel match { + edge.innerLabel.consistencyLevel match { case "strong" => Edge.mergeUpsert _ case _ => Edge.mergeInsertBulk _ } } else if (edge.op == GraphUtil.operations("insertBulk")) { Edge.mergeInsertBulk _ } else if (edge.op == GraphUtil.operations("delete")) { - edge.label.consistencyLevel match { + edge.innerLabel.consistencyLevel match { case "strong" => Edge.mergeDelete _ case _ => throw new RuntimeException("not supported") } @@ -438,7 +524,7 @@ object Edge { val maxTs = prevPropsWithTs.map(_._2.ts).max val newTs = if (maxTs > requestTs) maxTs else requestTs val propsWithTs = prevPropsWithTs ++ - Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.label.schemaVersion), newTs)) + Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.innerLabel.schemaVersion), newTs)) val edgeMutate = buildMutation(invertedEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala index ca32a14..a2b17ef 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala @@ -220,7 +220,7 @@ object Graph { case None => 1.0 case Some(timeDecay) => val tsVal = try { - val innerValWithTsOpt = edge.propsWithTs.get(timeDecay.labelMeta) + val innerValWithTsOpt = edge.propertyValue(timeDecay.labelMeta.name) innerValWithTsOpt.map { innerValWithTs => val innerVal = innerValWithTs.innerVal timeDecay.labelMeta.dataType match { @@ -324,17 +324,7 @@ object Graph { val label = edgeWithScore.label /** Select */ - val mergedPropsWithTs = - if (queryOption.selectColumns.isEmpty) { - label.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) => - labelMeta -> edge.propsWithTs.getOrElse(labelMeta, defaultVal) - } - } else { - val initial = Map(LabelMeta.timestamp -> edge.propsWithTs(LabelMeta.timestamp)) - propsSelectColumns.foldLeft(initial) { case (prev, labelMeta) => - prev + (labelMeta -> edge.propsWithTs.getOrElse(labelMeta, label.metaPropsDefaultMapInner(labelMeta))) - } - } + val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns) val newEdge = edge.copy(propsWithTs = mergedPropsWithTs) val newEdgeWithScore = edgeWithScore.copy(edge = newEdge) @@ -414,19 +404,10 @@ object Graph { /** Select */ val mergedPropsWithTs = if (queryOption.selectColumns.isEmpty) { - label.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) => - labelMeta -> edge.propsWithTs.getOrElse(labelMeta, defaultVal) - } + edge.propertyValuesInner() } else { - val initial = Map(LabelMeta.timestamp -> edge.propsWithTs(LabelMeta.timestamp)) - queryOption.selectColumns.foldLeft(initial) { case (acc, labelMetaName) => - label.metaPropsDefaultMapInnerString.get(labelMetaName) match { - case None => acc - case Some(defaultValue) => - val labelMeta = label.metaPropsInvMap(labelMetaName) - acc + (labelMeta -> edge.propsWithTs.getOrElse(labelMeta, defaultValue)) - } - } + val initial = Map(LabelMeta.timestamp -> edge.propertyValueInner(LabelMeta.timestamp)) + edge.propertyValues(queryOption.selectColumns) ++ initial } val newEdge = edge.copy(propsWithTs = mergedPropsWithTs) @@ -475,7 +456,7 @@ object Graph { stepIdx: Int, stepResultLs: Seq[(QueryRequest, StepResult)], parentEdges: Map[VertexId, Seq[EdgeWithScore]]) - (createFunc: (EdgeWithScore, Set[LabelMeta]) => T) + (createFunc: (EdgeWithScore, Seq[LabelMeta]) => T) (implicit ev: WithScore[T]): ListBuffer[T] = { import scala.collection._ @@ -500,7 +481,7 @@ object Graph { val propsSelectColumns = (for { column <- queryOption.propsSelectColumns labelMeta <- label.metaPropsInvMap.get(column) - } yield labelMeta).toSet + } yield labelMeta) for { edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges) @@ -831,14 +812,14 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) { /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache * so use empty cacheKey. * */ - val queryParam = QueryParam(labelName = edge.label.label, + val queryParam = QueryParam(labelName = edge.innerLabel.label, direction = GraphUtil.fromDirection(edge.labelWithDir.dir), tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal), cacheTTLInMillis = -1) val q = Query.toQuery(Seq(edge.srcVertex), queryParam) val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam) - val storage = getStorage(edge.label) + val storage = getStorage(edge.innerLabel) storage.fetchSnapshotEdgeKeyValues(queryRequest).map { kvs => val (edgeOpt, kvOpt) = if (kvs.isEmpty) (None, None) @@ -927,7 +908,7 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) { if deleteStepInnerResult.edgeWithScores.nonEmpty } yield { val head = deleteStepInnerResult.edgeWithScores.head - val label = head.edge.label + val label = head.edge.innerLabel val ret = label.schemaVersion match { case HBaseType.VERSION3 | HBaseType.VERSION4 => if (label.consistencyLevel == "strong") { @@ -965,16 +946,18 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) { if (filtered.isEmpty) StepResult.Empty else { val head = filtered.head - val label = head.edge.label + val label = head.edge.innerLabel val edgeWithScoreLs = filtered.map { edgeWithScore => val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match { case "strong" => - val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++ - Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion)) + val _newPropsWithTs = edgeWithScore.edge.updatePropsWithTs( + Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion)) + ) + (GraphUtil.operations("delete"), requestTs, _newPropsWithTs) case _ => val oldEdge = edgeWithScore.edge - (oldEdge.op, oldEdge.version, oldEdge.propsWithTs) + (oldEdge.op, oldEdge.version, oldEdge.updatePropsWithTs()) } val copiedEdge = @@ -1029,11 +1012,11 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) { val (strongEdges, weakEdges) = edgeWithIdxs.partition { case (edge, idx) => val e = edge - e.label.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk") + e.innerLabel.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk") } - val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.label.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) => - val futures = edgeWithIdxs.groupBy(_._1.label).map { case (label, edgeGroup) => + val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) => + val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, edgeGroup) => val storage = getStorage(label) val edges = edgeGroup.map(_._1) val idxs = edgeGroup.map(_._2) @@ -1056,10 +1039,10 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) { val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.op == GraphUtil.operations("deleteAll") } val deleteAllFutures = strongDeleteAll.map { case (edge, idx) => - deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), edge.labelWithDir.dir, edge.ts).map(idx -> _) + deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.innerLabel), edge.labelWithDir.dir, edge.ts).map(idx -> _) } - val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.label }.map { case (label, edgeGroup) => + val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) => val edges = edgeGroup.map(_._1) val idxs = edgeGroup.map(_._2) val storage = getStorage(label) @@ -1087,19 +1070,19 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) { def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = { val edgesWithIdx = edges.zipWithIndex - val futures = edgesWithIdx.groupBy { case (e, idx) => e.label }.map { case (label, edgeGroup) => + val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) => getStorage(label).incrementCounts(edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2))) } Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) } } def updateDegree(edge: Edge, degreeVal: Long = 0): Future[Boolean] = { - val label = edge.label + val label = edge.innerLabel val storage = getStorage(label) val kvs = storage.buildDegreePuts(edge, degreeVal) - storage.writeToStorage(edge.label.service.cluster, kvs, withWait = true) + storage.writeToStorage(edge.innerLabel.service.cluster, kvs, withWait = true) } def shutdown(): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala index 6c8563c..083159f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala @@ -83,7 +83,7 @@ object PostProcess { builder += ("from" -> anyValToJsValue(s2Edge.srcId).get) builder += ("label" -> anyValToJsValue(label.label).get) builder += ("direction" -> anyValToJsValue(s2Edge.direction).get) - builder += (LabelMeta.degree.name -> anyValToJsValue(s2Edge.propsWithTs(LabelMeta.degree).innerVal.value).get) + builder += (LabelMeta.degree.name -> anyValToJsValue(s2Edge.propertyValueInner(LabelMeta.degree).innerVal.value).get) JsObject(builder) } else { if (queryOption.withScore) builder += ("score" -> anyValToJsValue(score).get) @@ -95,7 +95,7 @@ object PostProcess { val innerProps = ArrayBuffer.empty[(String, JsValue)] for { - (labelMeta, v) <- edgeWithScore.edge.propsWithTs + (labelMeta, v) <- edgeWithScore.edge.propertyValues() jsValue <- anyValToJsValue(v.innerVal.value) } { innerProps += (labelMeta.name -> jsValue) @@ -126,9 +126,10 @@ object PostProcess { } val innerProps = ArrayBuffer.empty[(String, JsValue)] for { - (labelMeta, v) <- edgeWithScore.edge.propsWithTs - if !checkSelectColumns || queryOption.selectColumnsMap.contains(labelMeta.name) - jsValue <- anyValToJsValue(v.innerVal.value) + (selectColumnName, _) <- queryOption.selectColumnsMap + labelMeta <- label.metaPropsInvMap.get(selectColumnName) + innerValWithTs = edgeWithScore.edge.propertyValueInner(labelMeta) + jsValue <- anyValToJsValue(innerValWithTs.innerVal.value) } { innerProps += (labelMeta.name -> jsValue) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala index b481880..7b10709 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala @@ -166,11 +166,7 @@ case class EdgeTransformer(jsValue: JsValue) { fieldName match { case LabelMeta.to.name => Option(edge.tgtVertex.innerId) case LabelMeta.from.name => Option(edge.srcVertex.innerId) - case _ => - for { - labelMeta <- queryParam.label.metaPropsInvMap.get(fieldName) - value <- edge.propsWithTs.get(labelMeta) - } yield value.innerVal + case _ => edge.propertyValue(fieldName).map(_.innerVal) } } @@ -376,7 +372,7 @@ case class QueryParam(labelName: String, val propKey = _propKey.split("_parent.").last val padding = Try(_padding.trim.toLong).getOrElse(0L) - val labelMeta = edge.label.metaPropsInvMap.getOrElse(propKey, throw new RuntimeException(s"$propKey not found in ${edge} labelMetas.")) + val labelMeta = edge.innerLabel.metaPropsInvMap.getOrElse(propKey, throw new RuntimeException(s"$propKey not found in ${edge} labelMetas.")) val propVal = if (InnerVal.isNumericType(labelMeta.dataType)) { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala index dd7e45d..d8416c2 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala @@ -83,15 +83,8 @@ case class EdgeWithScore(edge: Edge, accumulatedScores: Map[String, Double] = Map.empty) { def toValue(keyName: String): Option[Any] = keyName match { - case "from" | "_from" => Option(edge.srcId) - case "to" | "_to" => Option(edge.tgtId) - case "label" => Option(label.label) - case "direction" => Option(edge.dir) case "score" => Option(score) - case _ => - label.metaPropsInvMap.get(keyName).flatMap { labelMeta => - edge.propsWithTs.get(labelMeta).orElse(label.metaPropsDefaultMapInner.get(labelMeta)).map(_.innerVal.value) - } + case _ => edge.propertyValue(keyName).map(_.innerVal.value) } def toValues(keyNames: Seq[String]): Seq[Option[Any]] = for { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala new file mode 100644 index 0000000..67a9d4c --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala @@ -0,0 +1,29 @@ +package org.apache.s2graph.core + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.mysqls.LabelMeta +import org.apache.s2graph.core.types.CanInnerValLike +import org.apache.tinkerpop.gremlin.structure.{Element, Property} + + +case class S2Property[V](element: Element, + labelMeta: LabelMeta, + key: String, + value: V, + ts: Long = System.currentTimeMillis()) extends Property[V] { + + import CanInnerValLike._ + lazy val innerVal = anyToInnerValLike.toInnerVal(value, labelMeta.label.schemaVersion) + + def bytes: Array[Byte] = { + innerVal.bytes + } + + def bytesWithTs: Array[Byte] = { + Bytes.add(innerVal.bytes, Bytes.toBytes(ts)) + } + + override def isPresent: Boolean = ??? + + override def remove(): Unit = ??? +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala new file mode 100644 index 0000000..e6da3f6 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala @@ -0,0 +1,28 @@ +package org.apache.s2graph.core + +import java.util + +import org.apache.s2graph.core.mysqls.ColumnMeta +import org.apache.s2graph.core.types.CanInnerValLike +import org.apache.tinkerpop.gremlin.structure.{Property, VertexProperty, Vertex => TpVertex} + +case class S2VertexProperty[V](element: TpVertex, + columnMeta: ColumnMeta, + key: String, + value: V) extends VertexProperty[V] { + implicit val encodingVer = columnMeta.serviceColumn.schemaVersion + val innerVal = CanInnerValLike.anyToInnerValLike.toInnerVal(value) + def toBytes: Array[Byte] = { + innerVal.bytes + } + + override def properties[U](strings: String*): util.Iterator[Property[U]] = ??? + + override def property[V](s: String, v: V): Property[V] = ??? + + override def remove(): Unit = ??? + + override def id(): AnyRef = ??? + + override def isPresent: Boolean = ??? +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala index 9af6243..f6c174d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala @@ -125,5 +125,6 @@ object ColumnMeta extends Model[ColumnMeta] { } case class ColumnMeta(id: Option[Int], columnId: Int, name: String, seq: Byte, dataType: String) { + lazy val serviceColumn = ServiceColumn.findById(columnId) lazy val toJson = Json.obj("name" -> name, "dataType" -> dataType) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala index 09d15d7..4970912 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala @@ -362,7 +362,7 @@ case class Label(id: Option[Int], label: String, } yield prop.name -> innerVal).toMap lazy val metaPropsDefaultMapInner = (for { - prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq) + prop <- metaPropsInner innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis()) } yield prop -> innerVal).toMap lazy val metaPropsDefaultMapInnerSeq = metaPropsDefaultMapInner.toSeq http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala index 4a7e931..6636649 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala @@ -191,6 +191,7 @@ case class LabelMeta(id: Option[Int], seq: Byte, defaultValue: String, dataType: String) { + lazy val label = Label.findById(labelId) lazy val toJson = Json.obj("name" -> name, "defaultValue" -> defaultValue, "dataType" -> dataType) override def equals(other: Any): Boolean = { if (!other.isInstanceOf[LabelMeta]) false http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala index effb94b..aa018a9 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala @@ -36,22 +36,19 @@ trait ExtractValue { def propToInnerVal(edge: Edge, key: String) = { val (propKey, parentEdge) = findParentEdge(edge, key) - val label = parentEdge.label + val label = parentEdge.innerLabel val metaPropInvMap = label.metaPropsInvMap val labelMeta = metaPropInvMap.getOrElse(propKey, throw WhereParserException(s"Where clause contains not existing property name: $propKey")) labelMeta match { case LabelMeta.from => parentEdge.srcVertex.innerId case LabelMeta.to => parentEdge.tgtVertex.innerId - case _ => parentEdge.propsWithTs.get(labelMeta) match { - case None => toInnerVal(labelMeta.defaultValue, labelMeta.dataType, label.schemaVersion) - case Some(edgeVal) => edgeVal.innerVal - } + case _ => parentEdge.propertyValueInner(labelMeta).innerVal } } def valueToCompare(edge: Edge, key: String, value: String) = { - val label = edge.label + val label = edge.innerLabel if (value.startsWith(parent) || label.metaPropsInvMap.contains(value)) propToInnerVal(edge, value) else { val (propKey, _) = findParentEdge(edge, key) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index f2b07cd..b1ef11d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -301,7 +301,7 @@ abstract class Storage[Q, R](val graph: Graph, val edgeWithIdxs = _edges.zipWithIndex val grouped = edgeWithIdxs.groupBy { case (edge, idx) => - (edge.label, edge.srcVertex.innerId, edge.tgtVertex.innerId) + (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId) } toSeq val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) => @@ -314,7 +314,7 @@ abstract class Storage[Q, R](val graph: Graph, //TODO: decide what we will do on failure on vertex put val puts = buildVertexPutsAsync(head) - val vertexFuture = writeToStorage(head.label.hbaseZkAddr, puts, withWait) + val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait) Seq(edgeFuture, vertexFuture) case Nil => Nil } @@ -358,7 +358,7 @@ abstract class Storage[Q, R](val graph: Graph, // TODO:: remove after code review: unreachable code if (!checkConsistency) { - val zkQuorum = edges.head.label.hbaseZkAddr + val zkQuorum = edges.head.innerLabel.hbaseZkAddr val futures = edges.map { edge => val (_, edgeUpdate) = Edge.buildOperation(None, Seq(edge)) @@ -672,7 +672,7 @@ abstract class Storage[Q, R](val graph: Graph, if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 3, s"$p")) else { val releaseLockEdgePuts = snapshotEdgeSerializer(releaseLockEdge).toKeyValues - writeToStorage(squashedEdge.label.hbaseZkAddr, releaseLockEdgePuts, withWait = true).recoverWith { + writeToStorage(squashedEdge.innerLabel.hbaseZkAddr, releaseLockEdgePuts, withWait = true).recoverWith { case ex: Exception => logger.error(s"ReleaseLock RPC Failed.") throw new PartialFailureException(squashedEdge, 3, "ReleaseLock RPC Failed") @@ -719,7 +719,7 @@ abstract class Storage[Q, R](val graph: Graph, val p = Random.nextDouble() if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 1, s"$p")) else - writeToStorage(squashedEdge.label.hbaseZkAddr, indexedEdgeMutations(edgeMutate), withWait = true).map { ret => + writeToStorage(squashedEdge.innerLabel.hbaseZkAddr, indexedEdgeMutations(edgeMutate), withWait = true).map { ret => if (ret) { debug(ret, "mutate", squashedEdge.toSnapshotEdge, edgeMutate) } else { @@ -746,7 +746,7 @@ abstract class Storage[Q, R](val graph: Graph, edgeMutate: EdgeMutate): Future[Boolean] = { def _write(kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = { - writeToStorage(squashedEdge.label.hbaseZkAddr, kvs, withWait = withWait).map { ret => + writeToStorage(squashedEdge.innerLabel.hbaseZkAddr, kvs, withWait = withWait).map { ret => if (ret) { debug(ret, "increment", squashedEdge.toSnapshotEdge, edgeMutate) } else { @@ -789,7 +789,7 @@ abstract class Storage[Q, R](val graph: Graph, if (stepInnerResult.isEmpty) Future.successful(true) else { val head = stepInnerResult.edgeWithScores.head - val zkQuorum = head.edge.label.hbaseZkAddr + val zkQuorum = head.edge.innerLabel.hbaseZkAddr val futures = for { edgeWithScore <- stepInnerResult.edgeWithScores } yield { @@ -857,7 +857,7 @@ abstract class Storage[Q, R](val graph: Graph, } } else { snapshotEdgeOpt.flatMap { snapshotEdge => - if (Edge.allPropsDeleted(snapshotEdge.props)) None + if (snapshotEdge.allPropsDeleted) None else { val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges) if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge) @@ -982,7 +982,7 @@ abstract class Storage[Q, R](val graph: Graph, /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache * so use empty cacheKey. * */ - val queryParam = QueryParam(labelName = edge.label.label, + val queryParam = QueryParam(labelName = edge.innerLabel.label, direction = GraphUtil.fromDirection(edge.labelWithDir.dir), tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal), cacheTTLInMillis = -1) @@ -1075,14 +1075,14 @@ abstract class Storage[Q, R](val graph: Graph, /** IndexEdge */ def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { - val newProps = indexedEdge.props ++ Map(LabelMeta.degree -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer)) - val _indexedEdge = indexedEdge.copy(props = newProps) + val newProps = indexedEdge.updatePropsWithTs(Map(LabelMeta.degree -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer))) + val _indexedEdge = indexedEdge.copy(propsWithTs = newProps) indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability)) } def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { - val newProps = indexedEdge.props ++ Map(LabelMeta.count -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer)) - val _indexedEdge = indexedEdge.copy(props = newProps) + val newProps = indexedEdge.updatePropsWithTs(Map(LabelMeta.count -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer))) + val _indexedEdge = indexedEdge.copy(propsWithTs = newProps) indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability)) } @@ -1096,7 +1096,7 @@ abstract class Storage[Q, R](val graph: Graph, } def buildVertexPutsAsync(edge: Edge): Seq[SKeyValue] = { - val storeVertex = edge.label.extraOptions.get("storeVertex").map(_.as[Boolean]).getOrElse(false) + val storeVertex = edge.innerLabel.extraOptions.get("storeVertex").map(_.as[Boolean]).getOrElse(false) if (storeVertex) { if (edge.op == GraphUtil.operations("delete")) @@ -1111,7 +1111,7 @@ abstract class Storage[Q, R](val graph: Graph, def buildDegreePuts(edge: Edge, degreeVal: Long): Seq[SKeyValue] = { val kvs = edge.edgesWithIndexValid.flatMap { _indexEdge => val newProps = Map(LabelMeta.degree -> InnerValLikeWithTs.withLong(degreeVal, _indexEdge.ts, _indexEdge.schemaVer)) - val indexEdge = _indexEdge.copy(props = newProps) + val indexEdge = _indexEdge.copy(propsWithTs = newProps) indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability)) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index e63dfea..b0287d5 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -251,8 +251,7 @@ class AsynchbaseStorage(override val graph: Graph, val snapshotEdge = edge.toSnapshotEdge snapshotEdgeSerializer(snapshotEdge) } else { - val indexEdge = IndexEdge(edge.srcVertex, edge.tgtVertex, edge.label, edge.dir, - edge.op, edge.version, queryParam.labelOrderSeq, edge.propsWithTs) + val indexEdge = edge.toIndexEdge(queryParam.labelOrderSeq) indexEdgeSerializer(indexEdge) } @@ -435,7 +434,7 @@ class AsynchbaseStorage(override val graph: Graph, relEdge <- edge.relatedEdges edgeWithIndex <- relEdge.edgesWithIndexValid } yield { - val countWithTs = edge.propsWithTs(LabelMeta.count) + val countWithTs = edge.propertyValueInner(LabelMeta.count) val countVal = countWithTs.innerVal.toString().toLong val kv = buildIncrementsCountAsync(edgeWithIndex, countVal).head val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala index cd242dc..2d49c11 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala @@ -59,9 +59,9 @@ class IndexEdgeSerializable(indexEdge: IndexEdge, longToBytes: Long => Array[Byt override def toValue: Array[Byte] = if (indexEdge.degreeEdge) - longToBytes(indexEdge.props(LabelMeta.degree).innerVal.toString().toLong) + longToBytes(indexEdge.property(LabelMeta.degree).innerVal.toString().toLong) else if (indexEdge.op == GraphUtil.operations("incrementCount")) - longToBytes(indexEdge.props(LabelMeta.count).innerVal.toString().toLong) + longToBytes(indexEdge.property(LabelMeta.count).innerVal.toString().toLong) else propsToKeyValues(indexEdge.metas.toSeq) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala index 211b159..f85159b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala @@ -60,9 +60,9 @@ class IndexEdgeSerializable(indexEdge: IndexEdge, longToBytes: Long => Array[Byt override def toValue: Array[Byte] = if (indexEdge.degreeEdge) - longToBytes(indexEdge.props(LabelMeta.degree).innerVal.toString().toLong) + longToBytes(indexEdge.property(LabelMeta.degree).innerVal.toString().toLong) else if (indexEdge.op == GraphUtil.operations("incrementCount")) - longToBytes(indexEdge.props(LabelMeta.count).innerVal.toString().toLong) + longToBytes(indexEdge.property(LabelMeta.count).innerVal.toString().toLong) else propsToKeyValues(indexEdge.metas.toSeq) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala index fc84469..e71760d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala @@ -36,8 +36,7 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[ val byte = (((statusCode << 4) | op).toByte) Array.fill(1)(byte.toByte) } - def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), - propsToKeyValuesWithTs(snapshotEdge.props.toList)) + def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), snapshotEdge.propsToKeyValuesWithTs) override def toRowKey: Array[Byte] = { val srcIdAndTgtIdBytes = SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, snapshotEdge.tgtVertex.innerId).bytes @@ -55,7 +54,7 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[ case Some(pendingEdge) => val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) val versionBytes = Array.empty[Byte] - val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq) + val propsBytes = pendingEdge.serializePropsWithTs() val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala index 4ceb4a8..ee2645a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala @@ -41,8 +41,7 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[ val byte = (((statusCode << 4) | op).toByte) Array.fill(1)(byte.toByte) } - def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), - propsToKeyValuesWithTs(snapshotEdge.props.toList)) + def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), snapshotEdge.propsToKeyValuesWithTs) override def toRowKey: Array[Byte] = { @@ -62,7 +61,7 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[ case Some(pendingEdge) => val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) val versionBytes = Array.empty[Byte] - val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq) + val propsBytes = pendingEdge.serializePropsWithTs() val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala index 1c58086..d90cf8e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala @@ -261,3 +261,91 @@ case class InnerValLikeWithTs(innerVal: InnerValLike, ts: Long) Bytes.add(innerVal.bytes, Bytes.toBytes(ts)) } } + +trait CanInnerValLike[A] { + def toInnerVal(element: A)(implicit encodingVer: String): InnerValLike +} +object CanInnerValLike { + implicit val encodingVer = "v2" + + def validate(element: Any, classType: String): Boolean = { + import InnerVal._ + classType match { + case BLOB => element.isInstanceOf[Array[Byte]] + case STRING => element.isInstanceOf[String] + case DOUBLE => element.isInstanceOf[Double] || element.isInstanceOf[BigDecimal] + case FLOAT => element.isInstanceOf[Float] || element.isInstanceOf[BigDecimal] + case LONG => element.isInstanceOf[Long] || element.isInstanceOf[BigDecimal] + case INT => element.isInstanceOf[Int] || element.isInstanceOf[BigDecimal] + case SHORT => element.isInstanceOf[Short] || element.isInstanceOf[BigDecimal] + case BYTE => element.isInstanceOf[Byte] || element.isInstanceOf[BigDecimal] + case BOOLEAN => element.isInstanceOf[Boolean] + case _ => throw new RuntimeException(s"not supported data type: $element, $classType") + } + } + implicit val anyToInnerValLike = new CanInnerValLike[Any] { + override def toInnerVal(element: Any)(implicit encodingVer: String): InnerValLike = { + element match { + case i: InnerValLike => i + case s: String => stringToInnerValLike.toInnerVal(s) + case i: Int => intToInnerValLike.toInnerVal(i) + case l: Long => longToInnerValLike.toInnerVal(l) + case f: Float => floatToInnerValLike.toInnerVal(f) + case d: Double => doubleToInnerValLike.toInnerVal(d) + case b: BigDecimal => bigDecimalToInnerValLike.toInnerVal(b) + case b: Boolean => booleanToInnerValLike.toInnerVal(b) + case b: Array[Byte] => blobToInnerValLike.toInnerVal(b) + case _ => throw new RuntimeException(s"not supported element type: $element, ${element.getClass}") + } + } + } + implicit val innerValLikeToInnerValLike = new CanInnerValLike[InnerValLike] { + override def toInnerVal(element: InnerValLike)(implicit encodingVer: String): InnerValLike = element + } + implicit val objectToInnerValLike = new CanInnerValLike[Object] { + override def toInnerVal(element: Object)(implicit encodingVer: String): InnerValLike = { + anyToInnerValLike.toInnerVal(element.asInstanceOf[Any]) + } + } + + implicit val stringToInnerValLike = new CanInnerValLike[String] { + override def toInnerVal(element: String)(implicit encodingVer: String): InnerValLike = { + InnerVal.withStr(element, encodingVer) + } + } + implicit val longToInnerValLike = new CanInnerValLike[Long] { + override def toInnerVal(element: Long)(implicit encodingVer: String): InnerValLike = { + InnerVal.withLong(element, encodingVer) + } + } + implicit val intToInnerValLike = new CanInnerValLike[Int] { + override def toInnerVal(element: Int)(implicit encodingVer: String): InnerValLike = { + InnerVal.withInt(element, encodingVer) + } + } + implicit val floatToInnerValLike = new CanInnerValLike[Float] { + override def toInnerVal(element: Float)(implicit encodingVer: String): InnerValLike = { + InnerVal.withFloat(element, encodingVer) + } + } + implicit val doubleToInnerValLike = new CanInnerValLike[Double] { + override def toInnerVal(element: Double)(implicit encodingVer: String): InnerValLike = { + InnerVal.withDouble(element, encodingVer) + } + } + implicit val bigDecimalToInnerValLike = new CanInnerValLike[BigDecimal] { + override def toInnerVal(element: BigDecimal)(implicit encodingVer: String): InnerValLike = { + InnerVal.withNumber(element, encodingVer) + } + } + implicit val booleanToInnerValLike = new CanInnerValLike[Boolean] { + override def toInnerVal(element: Boolean)(implicit encodingVer: String): InnerValLike = { + InnerVal.withBoolean(element, encodingVer) + } + } + implicit val blobToInnerValLike = new CanInnerValLike[Array[Byte]] { + override def toInnerVal(element: Array[Byte])(implicit encodingVer: String): InnerValLike = { + InnerVal.withBlob(element, encodingVer) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala index 34f4d2c..f58b192 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala @@ -133,6 +133,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { queryOption = QueryOption(groupBy = GroupBy(props, 100)) ) + test("query with defaultValue") { // ref: edges from initTestData() @@ -436,6 +437,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { // test parent With select fields var result = TestUtil.getEdgesSync(queryParents(src)) + println(s"$result") var parents = (result \ "results").as[Seq[JsValue]] var ret = parents.forall { edge => val parentEdges = (edge \ "parents").as[Seq[JsValue]] http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala index 042dce2..d70a08b 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala @@ -180,9 +180,9 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { val grandParentEdge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, parentPropsInner) val parentEdge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, parentPropsInner, - parentEdges = Seq(EdgeWithScore(grandParentEdge, 1.0, grandParentEdge.label))) + parentEdges = Seq(EdgeWithScore(grandParentEdge, 1.0, grandParentEdge.innerLabel))) val edge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, propsInner, - parentEdges = Seq(EdgeWithScore(parentEdge, 1.0, grandParentEdge.label))) + parentEdges = Seq(EdgeWithScore(parentEdge, 1.0, grandParentEdge.innerLabel))) println(edge.toString) println(parentEdge.toString) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala index 247cd07..cca3a59 100644 --- a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala +++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala @@ -54,7 +54,7 @@ object CounterEtlFunctions extends Logging { filterOps.contains(x.op) } } yield { - val label = edge.label + val label = edge.innerLabel val labelName = label.label val tgtService = label.tgtColumn.service.serviceName val tgtId = edge.tgtVertex.innerId.toString() http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala index 835cc72..b1635fb 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala @@ -54,7 +54,7 @@ object EdgeController extends Controller { case v: Vertex => enqueue(kafkaTopic, graphElem, tsv) case e: Edge => - e.label.extraOptions.get("walLog") match { + e.innerLabel.extraOptions.get("walLog") match { case None => enqueue(kafkaTopic, e, tsv) case Some(walLogOpt) => @@ -94,7 +94,7 @@ object EdgeController extends Controller { results.get.zip(elements).map { case (false, (e: Edge, tsv: String)) => val kafkaMessages = if(e.op == GraphUtil.operations("deleteAll")){ - toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.label), e.labelWithDir.dir, e.ts) + toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.innerLabel), e.labelWithDir.dir, e.ts) } else{ Seq(ExceptionHandler.toKafkaMessage(Config.KAFKA_MUTATE_FAIL_TOPIC, e, Some(tsv))) }
