passed s2tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/87394b9f Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/87394b9f Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/87394b9f Branch: refs/heads/master Commit: 87394b9f7b7e241642201460a460ba2403a0fb99 Parents: 7413aad Author: DO YUNG YOON <[email protected]> Authored: Fri Nov 3 15:28:08 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Fri Nov 3 15:28:08 2017 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/core/QueryParam.scala | 10 +-- .../org/apache/s2graph/core/QueryResult.scala | 2 +- .../scala/org/apache/s2graph/core/S2Edge.scala | 68 +++++++++----------- .../org/apache/s2graph/core/S2EdgeLike.scala | 54 +++++++++++++--- .../scala/org/apache/s2graph/core/S2Graph.scala | 50 +++++++------- .../org/apache/s2graph/core/S2Property.scala | 2 +- .../org/apache/s2graph/core/S2VertexLike.scala | 2 +- .../s2graph/core/index/IndexProvider.scala | 12 ++-- .../s2graph/core/parsers/WhereParser.scala | 34 +++++----- .../s2graph/core/rest/RequestParser.scala | 4 +- .../apache/s2graph/core/storage/Storage.scala | 10 +-- .../apache/s2graph/core/storage/StorageIO.scala | 20 +++--- .../s2graph/core/storage/StorageReadable.scala | 6 +- .../storage/WriteWriteConflictResolver.scala | 34 +++++----- .../hbase/AsynchbaseStorageReadable.scala | 6 +- .../tall/IndexEdgeDeserializable.scala | 17 +++-- .../wide/IndexEdgeDeserializable.scala | 12 ++-- .../tall/SnapshotEdgeSerializable.scala | 2 +- .../wide/SnapshotEdgeSerializable.scala | 2 +- .../s2graph/core/parsers/WhereParserTest.scala | 2 +- .../core/tinkerpop/S2GraphProvider.scala | 1 + .../rest/play/controllers/EdgeController.scala | 6 +- 22 files changed, 195 insertions(+), 161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala index 2e8d1f4..e98ef37 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala @@ -163,7 +163,7 @@ case class EdgeTransformer(jsValue: JsValue) { } } - def toInnerValOpt(queryParam: QueryParam, edge: S2Edge, fieldName: String): Option[InnerValLike] = { + def toInnerValOpt(queryParam: QueryParam, edge: S2EdgeLike, fieldName: String): Option[InnerValLike] = { fieldName match { case LabelMeta.to.name => Option(edge.tgtVertex.innerId) case LabelMeta.from.name => Option(edge.srcVertex.innerId) @@ -171,7 +171,7 @@ case class EdgeTransformer(jsValue: JsValue) { } } - def transform(queryParam: QueryParam, edge: S2Edge, nextStepOpt: Option[Step]): Seq[S2Edge] = { + def transform(queryParam: QueryParam, edge: S2EdgeLike, nextStepOpt: Option[Step]): Seq[S2EdgeLike] = { if (isDefault) Seq(edge) else { val edges = for { @@ -311,7 +311,7 @@ case class QueryParam(labelName: String, CanInnerValLike.anyToInnerValLike.toInnerVal(id)(label.tgtColumnWithDir(dir).schemaVersion) } - def buildInterval(edgeOpt: Option[S2Edge]) = intervalOpt match { + def buildInterval(edgeOpt: Option[S2EdgeLike]) = intervalOpt match { case None => Array.empty[Byte] -> Array.empty[Byte] case Some(interval) => val (froms, tos) = interval @@ -359,7 +359,7 @@ case class QueryParam(labelName: String, Bytes.add(bytes, optionalCacheKey) } - private def convertToInner(kvs: Seq[(String, JsValue)], edgeOpt: Option[S2Edge]): Seq[(LabelMeta, InnerValLike)] = { + private def convertToInner(kvs: Seq[(String, JsValue)], edgeOpt: Option[S2EdgeLike]): Seq[(LabelMeta, InnerValLike)] = { kvs.map { case (propKey, propValJs) => propValJs match { case JsString(in) if edgeOpt.isDefined && in.contains("_parent.") => @@ -392,7 +392,7 @@ case class QueryParam(labelName: String, } } - def paddingInterval(len: Byte, froms: Seq[(String, JsValue)], tos: Seq[(String, JsValue)], edgeOpt: Option[S2Edge] = None) = { + def paddingInterval(len: Byte, froms: Seq[(String, JsValue)], tos: Seq[(String, JsValue)], edgeOpt: Option[S2EdgeLike] = None) = { val fromInnerVal = convertToInner(froms, edgeOpt) val toInnerVal = convertToInner(tos, edgeOpt) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala index 7506b40..9bd3cdb 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala @@ -91,7 +91,7 @@ object WithScore { } } -case class EdgeWithScore(edge: S2Edge, +case class EdgeWithScore(edge: S2EdgeLike, score: Double, label: Label, orderByValues: (Any, Any, Any, Any) = StepResult.EmptyOrderByValues, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala index 97abd26..3529991 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala @@ -71,7 +71,7 @@ case class SnapshotEdge(graph: S2Graph, op: Byte, version: Long, private val propsWithTs: Props, - pendingEdgeOpt: Option[S2Edge], + pendingEdgeOpt: Option[S2EdgeLike], statusCode: Byte = 0, lockTs: Option[Long], tsInnerValOpt: Option[InnerValLike] = None) { @@ -89,7 +89,7 @@ case class SnapshotEdge(graph: S2Graph, def allPropsDeleted = S2Edge.allPropsDeleted(propsWithTs) - def toEdge: S2Edge = { + def toEdge: S2EdgeLike = { S2Edge(graph, srcVertex, tgtVertex, label, dir, op, version, propsWithTs, pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt) @@ -243,7 +243,7 @@ case class IndexEdge(graph: S2Graph, } yield meta.name -> jsValue - def toEdge: S2Edge = S2Edge(graph, srcVertex, tgtVertex, label, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt) + def toEdge: S2EdgeLike = S2Edge(graph, srcVertex, tgtVertex, label, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt) // only for debug def toLogString() = { @@ -310,11 +310,11 @@ case class S2Edge(override val innerGraph: S2Graph, var version: Long = System.currentTimeMillis(), override val propsWithTs: Props = S2Edge.EmptyProps, override val parentEdges: Seq[EdgeWithScore] = Nil, - override val originalEdgeOpt: Option[S2Edge] = None, - override val pendingEdgeOpt: Option[S2Edge] = None, + override val originalEdgeOpt: Option[S2EdgeLike] = None, + override val pendingEdgeOpt: Option[S2EdgeLike] = None, override val statusCode: Byte = 0, override val lockTs: Option[Long] = None, - var tsInnerValOpt: Option[InnerValLike] = None) extends S2EdgeLike with GraphElement { + var tsInnerValOpt: Option[InnerValLike] = None) extends S2EdgeLike { // if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.") // assert(propsWithTs.contains(LabelMeta.timeStampSeq)) @@ -335,10 +335,10 @@ case class S2Edge(override val innerGraph: S2Graph, // } - def toLogString: String = { - // val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj()) - List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, propsWithTs).mkString("\t") - } +// def toLogString: String = { +// // val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj()) +// List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, propsWithTs).mkString("\t") +// } override def hashCode(): Int = { id().hashCode() @@ -455,7 +455,7 @@ object S2Edge { type State = Map[LabelMeta, InnerValLikeWithTs] type PropsPairWithTs = (State, State, Long, String) type MergeState = PropsPairWithTs => (State, Boolean) - type UpdateFunc = (Option[S2Edge], S2Edge, MergeState) + type UpdateFunc = (Option[S2EdgeLike], S2EdgeLike, MergeState) def EmptyProps = new java.util.HashMap[String, S2Property[_]] def EmptyState = Map.empty[LabelMeta, InnerValLikeWithTs] @@ -490,7 +490,7 @@ object S2Edge { def fillPropsWithTs(indexEdge: IndexEdge, state: State): Unit = { state.foreach { case (k, v) => indexEdge.property(k.name, v.innerVal.value, v.ts) } } - def fillPropsWithTs(edge: S2Edge, state: State): Unit = { + def fillPropsWithTs(edge: S2EdgeLike, state: State): Unit = { state.foreach { case (k, v) => edge.propertyInner(k.name, v.innerVal.value, v.ts) } } @@ -500,7 +500,7 @@ object S2Edge { }.toMap } - def stateToProps(edge: S2Edge, state: State): Props = { + def stateToProps(edge: S2EdgeLike, state: State): Props = { state.foreach { case (k, v) => edge.propertyInner(k.name, v.innerVal.value, v.ts) } @@ -533,7 +533,7 @@ object S2Edge { ret } - def buildDeleteBulk(invertedEdge: Option[S2Edge], requestEdge: S2Edge): (S2Edge, EdgeMutate) = { + def buildDeleteBulk(invertedEdge: Option[S2EdgeLike], requestEdge: S2EdgeLike): (S2EdgeLike, EdgeMutate) = { // assert(invertedEdge.isEmpty) // assert(requestEdge.op == GraphUtil.operations("delete")) @@ -543,7 +543,7 @@ object S2Edge { (requestEdge, EdgeMutate(edgesToDelete, edgesToInsert = Nil, newSnapshotEdge = edgeInverted)) } - def buildOperation(invertedEdge: Option[S2Edge], requestEdges: Seq[S2Edge]): (S2Edge, EdgeMutate) = { + def buildOperation(invertedEdge: Option[S2EdgeLike], requestEdges: Seq[S2EdgeLike]): (S2EdgeLike, EdgeMutate) = { // logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}") // logger.debug(s"requestEdge: ${requestEdge.toStringRaw}") val oldPropsWithTs = @@ -551,21 +551,21 @@ object S2Edge { else propsToState(invertedEdge.get.propsWithTs) val funcs = requestEdges.map { edge => - if (edge.op == GraphUtil.operations("insert")) { + if (edge.getOp() == GraphUtil.operations("insert")) { edge.innerLabel.consistencyLevel match { case "strong" => S2Edge.mergeUpsert _ case _ => S2Edge.mergeInsertBulk _ } - } else if (edge.op == GraphUtil.operations("insertBulk")) { + } else if (edge.getOp() == GraphUtil.operations("insertBulk")) { S2Edge.mergeInsertBulk _ - } else if (edge.op == GraphUtil.operations("delete")) { + } else if (edge.getOp() == GraphUtil.operations("delete")) { edge.innerLabel.consistencyLevel match { case "strong" => S2Edge.mergeDelete _ case _ => throw new RuntimeException("not supported") } } - else if (edge.op == GraphUtil.operations("update")) S2Edge.mergeUpdate _ - else if (edge.op == GraphUtil.operations("increment")) S2Edge.mergeIncrement _ + else if (edge.getOp() == GraphUtil.operations("update")) S2Edge.mergeUpdate _ + else if (edge.getOp() == GraphUtil.operations("increment")) S2Edge.mergeIncrement _ else throw new RuntimeException(s"not supported operation on edge: $edge") } @@ -587,7 +587,7 @@ object S2Edge { } val requestTs = requestEdge.ts /* version should be monotoniously increasing so our RPC mutation should be applied safely */ - val newVersion = invertedEdge.map(e => e.version + incrementVersion).getOrElse(requestTs) + val newVersion = invertedEdge.map(e => e.getVersion() + incrementVersion).getOrElse(requestTs) val maxTs = prevPropsWithTs.map(_._2.ts).max val newTs = if (maxTs > requestTs) maxTs else requestTs val propsWithTs = prevPropsWithTs ++ @@ -597,14 +597,14 @@ object S2Edge { // logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}") // logger.error(s"$propsWithTs") - val newEdge = requestEdge.copy(propsWithTs = EmptyProps) - fillPropsWithTs(newEdge, propsWithTs) + val newEdge =requestEdge.copyEdgeWithState(propsWithTs) + (newEdge, edgeMutate) } } - def buildMutation(snapshotEdgeOpt: Option[S2Edge], - requestEdge: S2Edge, + def buildMutation(snapshotEdgeOpt: Option[S2EdgeLike], + requestEdge: S2EdgeLike, newVersion: Long, oldPropsWithTs: Map[LabelMeta, InnerValLikeWithTs], newPropsWithTs: Map[LabelMeta, InnerValLikeWithTs]): EdgeMutate = { @@ -615,14 +615,14 @@ object S2Edge { } else { val withOutDeletedAt = newPropsWithTs.filter(kv => kv._1 != LabelMeta.lastDeletedAtSeq) val newOp = snapshotEdgeOpt match { - case None => requestEdge.op + case None => requestEdge.getOp() case Some(old) => val oldMaxTs = old.propsWithTs.asScala.map(_._2.ts).max - if (oldMaxTs > requestEdge.ts) old.op - else requestEdge.op + if (oldMaxTs > requestEdge.ts) old.getOp() + else requestEdge.getOp() } - val newSnapshotEdge = requestEdge.copy(op = newOp, version = newVersion).copyEdgeWithState(newPropsWithTs) + val newSnapshotEdge = requestEdge.copyOp(newOp).copyVersion(newVersion).copyEdgeWithState(newPropsWithTs) val newSnapshotEdgeOpt = Option(newSnapshotEdge.toSnapshotEdge) // delete request must always update snapshot. @@ -631,8 +631,8 @@ object S2Edge { EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = newSnapshotEdgeOpt) } else { val edgesToDelete = snapshotEdgeOpt match { - case Some(snapshotEdge) if snapshotEdge.op != GraphUtil.operations("delete") => - snapshotEdge.copy(op = GraphUtil.defaultOpByte) + case Some(snapshotEdge) if snapshotEdge.getOp() != GraphUtil.operations("delete") => + snapshotEdge.copyOp(GraphUtil.defaultOpByte) .relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid } case _ => Nil } @@ -640,11 +640,7 @@ object S2Edge { val edgesToInsert = if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil else { - val newEdge = requestEdge.copy( - version = newVersion, - propsWithTs = S2Edge.EmptyProps, - op = GraphUtil.defaultOpByte - ) + val newEdge = requestEdge.copyOp(GraphUtil.defaultOpByte).copyVersion(newVersion).copyEdgeWithState(S2Edge.EmptyState) newPropsWithTs.foreach { case (k, v) => newEdge.propertyInner(k.name, v.innerVal.value, v.ts) } newEdge.relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala index 04ed7ab..bb58554 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala @@ -14,7 +14,7 @@ import play.api.libs.json.Json import scala.concurrent.Await import scala.collection.JavaConverters._ -trait S2EdgeLike extends Edge { +trait S2EdgeLike extends Edge with GraphElement { this: S2Edge => val innerGraph: S2Graph @@ -27,8 +27,8 @@ trait S2EdgeLike extends Edge { // var version: Long = System.currentTimeMillis() val propsWithTs: Props = S2Edge.EmptyProps val parentEdges: Seq[EdgeWithScore] = Nil - val originalEdgeOpt: Option[S2Edge] = None - val pendingEdgeOpt: Option[S2Edge] = None + val originalEdgeOpt: Option[S2EdgeLike] = None + val pendingEdgeOpt: Option[S2EdgeLike] = None val statusCode: Byte = 0 val lockTs: Option[Long] = None // var tsInnerValOpt: Option[InnerValLike] = None @@ -48,6 +48,13 @@ trait S2EdgeLike extends Edge { lazy val labelName = innerLabel.label lazy val direction = GraphUtil.fromDirection(dir) + def getOp(): Byte = op + def setOp(newOp: Byte): Unit = op = newOp + def getVersion(): Long = version + def setVersion(newVersion: Long): Unit = version = newVersion + def getTsInnerValOpt(): Option[InnerValLike] = tsInnerValOpt + def setTsInnerValOpt(newTsInnerValOpt: Option[InnerValLike]): Unit = tsInnerValOpt = newTsInnerValOpt + def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs) def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq) @@ -196,6 +203,11 @@ trait S2EdgeLike extends Edge { S2Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt) } + def updateOriginalEdgeOpt(newOriginalEdgeOpt: S2EdgeLike): S2EdgeLike = { + S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, propsWithTs, parentEdges, + Option(newOriginalEdgeOpt), pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) + } + def rank(r: RankParam): Double = if (r.keySeqAndWeights.size <= 0) 1.0f else { @@ -225,12 +237,12 @@ trait S2EdgeLike extends Edge { version: Long = version, propsWithTs: State = S2Edge.propsToState(this.propsWithTs), parentEdges: Seq[EdgeWithScore] = parentEdges, - originalEdgeOpt: Option[S2Edge] = originalEdgeOpt, - pendingEdgeOpt: Option[S2Edge] = pendingEdgeOpt, + originalEdgeOpt: Option[S2EdgeLike] = originalEdgeOpt, + pendingEdgeOpt: Option[S2EdgeLike] = pendingEdgeOpt, statusCode: Byte = statusCode, lockTs: Option[Long] = lockTs, tsInnerValOpt: Option[InnerValLike] = tsInnerValOpt, - ts: Long = ts): S2Edge = { + ts: Long = ts): S2EdgeLike = { val edge = new S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, S2Edge.EmptyProps, parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) S2Edge.fillPropsWithTs(edge, propsWithTs) @@ -238,19 +250,39 @@ trait S2EdgeLike extends Edge { edge } - def copyEdgeWithState(state: State, ts: Long): S2Edge = { + def copyEdgeWithState(state: State, ts: Long): S2EdgeLike = { val newEdge = copy(propsWithTs = S2Edge.EmptyProps) S2Edge.fillPropsWithTs(newEdge, state) newEdge.propertyInner(LabelMeta.timestamp.name, ts, ts) newEdge } - def copyEdgeWithState(state: State): S2Edge = { + def copyEdgeWithState(state: State): S2EdgeLike = { val newEdge = copy(propsWithTs = S2Edge.EmptyProps) S2Edge.fillPropsWithTs(newEdge, state) newEdge } + def copyOp(newOp: Byte): S2EdgeLike = { + copy(op = newOp) + } + + def copyVersion(newVersion: Long): S2EdgeLike = { + copy(version = newVersion) + } + + def copyParentEdges(parents: Seq[EdgeWithScore]): S2EdgeLike = { + copy(parentEdges = parents) + } + + def copyStatusCode(newStatusCode: Byte): S2EdgeLike = { + copy(statusCode = newStatusCode) + } + + def copyLockTs(newLockTs: Option[Long]): S2EdgeLike = { + copy(lockTs = newLockTs) + } + def vertices(direction: Direction): util.Iterator[structure.Vertex] = { val arr = new util.ArrayList[Vertex]() @@ -373,7 +405,13 @@ trait S2EdgeLike extends Edge { } } + def toLogString: String = { + // val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj()) + List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, propsWithTs).mkString("\t") + } + private def getServiceColumn(vertex: S2VertexLike, defaultServiceColumn: ServiceColumn) = if (vertex.id.column == ServiceColumn.Default) defaultServiceColumn else vertex.id.column + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index 90190cf..8bb95fc 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -201,13 +201,13 @@ object S2Graph { } /** common methods for filter out, transform, aggregate queryResult */ - def convertEdges(queryParam: QueryParam, edge: S2Edge, nextStepOpt: Option[Step]): Seq[S2Edge] = { + def convertEdges(queryParam: QueryParam, edge: S2EdgeLike, nextStepOpt: Option[Step]): Seq[S2EdgeLike] = { for { convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree } yield convertedEdge } - def processTimeDecay(queryParam: QueryParam, edge: S2Edge) = { + def processTimeDecay(queryParam: QueryParam, edge: S2EdgeLike) = { /* process time decay */ val tsVal = queryParam.timeDecay match { case None => 1.0 @@ -258,7 +258,7 @@ object S2Graph { } } - def toHashKey(queryParam: QueryParam, edge: S2Edge, isDegree: Boolean): (HashKey, FilterHashKey) = { + def toHashKey(queryParam: QueryParam, edge: S2EdgeLike, isDegree: Boolean): (HashKey, FilterHashKey) = { val src = edge.srcVertex.innerId.hashCode() val tgt = edge.tgtVertex.innerId.hashCode() val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree) @@ -435,7 +435,7 @@ object S2Graph { val tsVal = processTimeDecay(queryParam, edge) val newScore = degreeScore + score // val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge - val newEdge = edge.copy(parentEdges = parents) + val newEdge = edge.copyParentEdges(parents) edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal) } @@ -971,7 +971,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph def fallback = Future.successful(StepResult.Empty) - def checkEdges(edges: Seq[S2Edge]): Future[StepResult] = { + def checkEdges(edges: Seq[S2EdgeLike]): Future[StepResult] = { val futures = for { edge <- edges } yield { @@ -1319,11 +1319,11 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph def mutateElements(elements: Seq[GraphElement], withWait: Boolean = false): Future[Seq[MutateResponse]] = { - val edgeBuffer = ArrayBuffer[(S2Edge, Int)]() + val edgeBuffer = ArrayBuffer[(S2EdgeLike, Int)]() val vertexBuffer = ArrayBuffer[(S2VertexLike, Int)]() elements.zipWithIndex.foreach { - case (e: S2Edge, idx: Int) => edgeBuffer.append((e, idx)) + case (e: S2EdgeLike, idx: Int) => edgeBuffer.append((e, idx)) case (v: S2VertexLike, idx: Int) => vertexBuffer.append((v, idx)) case any@_ => logger.error(s"Unknown type: ${any}") } @@ -1347,13 +1347,13 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph // def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait) - def mutateEdges(edges: Seq[S2Edge], withWait: Boolean = false): Future[Seq[MutateResponse]] = { + def mutateEdges(edges: Seq[S2EdgeLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = { val edgeWithIdxs = edges.zipWithIndex val (strongEdges, weakEdges) = edgeWithIdxs.partition { case (edge, idx) => val e = edge - e.innerLabel.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk") + e.innerLabel.consistencyLevel == "strong" && e.getOp() != GraphUtil.operations("insertBulk") } val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) => @@ -1365,7 +1365,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph /* multiple edges with weak consistency level will be processed as batch */ val mutations = edges.flatMap { edge => val (_, edgeUpdate) = - if (edge.op == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge) + if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge) else S2Edge.buildOperation(None, Seq(edge)) val (bufferIncr, nonBufferIncr) = storage.increments(edgeUpdate.deepCopy) @@ -1380,7 +1380,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph } Future.sequence(futures) } - val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.op == GraphUtil.operations("deleteAll") } + val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.getOp() == GraphUtil.operations("deleteAll") } val deleteAllFutures = strongDeleteAll.map { case (edge, idx) => deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.innerLabel), edge.labelWithDir.dir, edge.ts).map(idx -> _) @@ -1404,7 +1404,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph } } - private def mutateStrongEdges(storage: Storage)(_edges: Seq[S2Edge], withWait: Boolean): Future[Seq[Boolean]] = { + private def mutateStrongEdges(storage: Storage)(_edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[Boolean]] = { val edgeWithIdxs = _edges.zipWithIndex val grouped = edgeWithIdxs.groupBy { case (edge, idx) => @@ -1440,7 +1440,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph } - private def mutateEdgesInner(storage: Storage)(edges: Seq[S2Edge], + private def mutateEdgesInner(storage: Storage)(edges: Seq[S2EdgeLike], checkConsistency: Boolean, withWait: Boolean): Future[MutateResponse] = { assert(edges.nonEmpty) @@ -1495,8 +1495,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph - def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = { - def incrementCounts(storage: Storage)(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = { + def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = { + def incrementCounts(storage: Storage)(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = { val futures = for { edge <- edges } yield { @@ -1523,7 +1523,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph } } - def updateDegree(edge: S2Edge, degreeVal: Long = 0): Future[MutateResponse] = { + def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): Future[MutateResponse] = { val label = edge.innerLabel val storage = getStorage(label) @@ -1571,11 +1571,11 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph toVertex(GraphUtil.split(s)) } - def toEdge(s: String): Option[S2Edge] = { + def toEdge(s: String): Option[S2EdgeLike] = { toEdge(GraphUtil.split(s)) } - def toEdge(parts: Array[String]): Option[S2Edge] = Try { + def toEdge(parts: Array[String]): Option[S2EdgeLike] = Try { val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] val tempDirection = if (parts.length >= 8) parts(7) else "out" @@ -1605,7 +1605,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph direction: String, props: Map[String, Any] = Map.empty, ts: Long = System.currentTimeMillis(), - operation: String = "insert"): S2Edge = { + operation: String = "insert"): S2EdgeLike = { val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) val srcColumn = if (direction == "out") label.srcColumn else label.tgtColumn @@ -1649,7 +1649,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph vertex } - def toRequestEdge(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): S2Edge = { + def toRequestEdge(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): S2EdgeLike = { val srcVertex = queryRequest.vertex val queryParam = queryRequest.queryParam val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt @@ -1710,11 +1710,11 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph version: Long = System.currentTimeMillis(), propsWithTs: S2Edge.State, parentEdges: Seq[EdgeWithScore] = Nil, - originalEdgeOpt: Option[S2Edge] = None, - pendingEdgeOpt: Option[S2Edge] = None, + originalEdgeOpt: Option[S2EdgeLike] = None, + pendingEdgeOpt: Option[S2EdgeLike] = None, statusCode: Byte = 0, lockTs: Option[Long] = None, - tsInnerValOpt: Option[InnerValLike] = None): S2Edge = { + tsInnerValOpt: Option[InnerValLike] = None): S2EdgeLike = { val edge = S2Edge( this, srcVertex, @@ -1758,7 +1758,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph op: Byte, version: Long, propsWithTs: S2Edge.State, - pendingEdgeOpt: Option[S2Edge], + pendingEdgeOpt: Option[S2EdgeLike], statusCode: Byte = 0, lockTs: Option[Long], tsInnerValOpt: Option[InnerValLike] = None): SnapshotEdge = { @@ -1874,7 +1874,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph def edgesAsync(edgeIds: AnyRef*): Future[util.Iterator[structure.Edge]] = { val s2EdgeIds = edgeIds.collect { - case s2Edge: S2Edge => s2Edge.id().asInstanceOf[EdgeId] + case s2Edge: S2EdgeLike => s2Edge.id().asInstanceOf[EdgeId] case id: EdgeId => id case s: String => EdgeId.fromString(s) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala index e86c17f..50b94de 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala @@ -102,7 +102,7 @@ object S2Property { } } -case class S2Property[V](element: S2Edge, +case class S2Property[V](element: S2EdgeLike, labelMeta: LabelMeta, key: String, v: V, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala index 5a8f722..b88c18d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala @@ -71,7 +71,7 @@ trait S2VertexLike extends Vertex with GraphElement { val arr = new util.ArrayList[Vertex]() edges(direction, edgeLabels: _*).forEachRemaining(new Consumer[Edge] { override def accept(edge: Edge): Unit = { - val s2Edge = edge.asInstanceOf[S2Edge] + val s2Edge = edge.asInstanceOf[S2EdgeLike] s2Edge.direction match { case "out" => arr.add(edge.inVertex()) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala index 098d0b4..e5005b7 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala @@ -29,7 +29,7 @@ import org.apache.lucene.queryparser.classic.{ParseException, QueryParser} import org.apache.lucene.search.IndexSearcher import org.apache.lucene.store.{BaseDirectory, RAMDirectory} import org.apache.s2graph.core.io.Conversions -import org.apache.s2graph.core.{EdgeId, S2Edge, S2Vertex, S2VertexLike} +import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls._ import org.apache.s2graph.core.types.{InnerValLike, VertexId} import org.apache.s2graph.core.utils.logger @@ -130,8 +130,8 @@ trait IndexProvider { def mutateVertices(vertices: Seq[S2VertexLike]): Seq[Boolean] def mutateVerticesAsync(vertices: Seq[S2VertexLike]): Future[Seq[Boolean]] - def mutateEdges(edges: Seq[S2Edge]): Seq[Boolean] - def mutateEdgesAsync(edges: Seq[S2Edge]): Future[Seq[Boolean]] + def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean] + def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]] def shutdown(): Unit } @@ -179,7 +179,7 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { } } - private def toDocument(globalIndex: GlobalIndex, edge: S2Edge): Option[Document] = { + private def toDocument(globalIndex: GlobalIndex, edge: S2EdgeLike): Option[Document] = { val props = edge.propsWithTs.asScala val exist = props.exists(t => globalIndex.propNamesSet(t._1)) if (!exist) None @@ -222,7 +222,7 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { vertices.map(_ => true) } - override def mutateEdges(edges: Seq[S2Edge]): Seq[Boolean] = { + override def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean] = { val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.EdgeType) globalIndexOptions.map { globalIndex => @@ -316,5 +316,5 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { override def mutateVerticesAsync(vertices: Seq[S2VertexLike]): Future[Seq[Boolean]] = Future.successful(mutateVertices(vertices)) - override def mutateEdgesAsync(edges: Seq[S2Edge]): Future[Seq[Boolean]] = Future.successful(mutateEdges(edges)) + override def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]] = Future.successful(mutateEdges(edges)) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala index d754bb7..a0d56b2 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala @@ -22,7 +22,7 @@ package org.apache.s2graph.core.parsers import org.apache.s2graph.core.GraphExceptions.{LabelNotExistException, WhereParserException} import org.apache.s2graph.core.mysqls.{Label, LabelMeta} import org.apache.s2graph.core.types.InnerValLike -import org.apache.s2graph.core.{S2Edge, GraphUtil} +import org.apache.s2graph.core.{GraphUtil, S2Edge, S2EdgeLike} import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core.utils.logger @@ -33,7 +33,7 @@ import scala.util.parsing.combinator.JavaTokenParsers trait ExtractValue { val parent = "_parent." - def propToInnerVal(edge: S2Edge, key: String) = { + def propToInnerVal(edge: S2EdgeLike, key: String) = { val (propKey, parentEdge) = findParentEdge(edge, key) val label = parentEdge.innerLabel @@ -47,7 +47,7 @@ trait ExtractValue { } } - def valueToCompare(edge: S2Edge, key: String, value: String) = { + def valueToCompare(edge: S2EdgeLike, key: String, value: String) = { val label = edge.innerLabel if (value.startsWith(parent) || label.metaPropsInvMap.contains(value)) propToInnerVal(edge, value) else { @@ -65,11 +65,11 @@ trait ExtractValue { } @tailrec - private def findParent(edge: S2Edge, depth: Int): S2Edge = + private def findParent(edge: S2EdgeLike, depth: Int): S2EdgeLike = if (depth > 0) findParent(edge.parentEdges.head.edge, depth - 1) else edge - private def findParentEdge(edge: S2Edge, key: String): (String, S2Edge) = { + private def findParentEdge(edge: S2EdgeLike, key: String): (String, S2EdgeLike) = { if (!key.startsWith(parent)) (key, edge) else { val split = key.split(parent) @@ -88,9 +88,9 @@ trait Clause extends ExtractValue { def or(otherField: Clause): Clause = Or(this, otherField) - def filter(edge: S2Edge): Boolean + def filter(edge: S2EdgeLike): Boolean - def binaryOp(binOp: (InnerValLike, InnerValLike) => Boolean)(propKey: String, value: String)(edge: S2Edge): Boolean = { + def binaryOp(binOp: (InnerValLike, InnerValLike) => Boolean)(propKey: String, value: String)(edge: S2EdgeLike): Boolean = { val propValue = propToInnerVal(edge, propKey) val compValue = valueToCompare(edge, propKey, value) @@ -105,20 +105,20 @@ object Where { } } case class Where(clauses: Seq[Clause] = Seq.empty[Clause]) { - def filter(edge: S2Edge) = + def filter(edge: S2EdgeLike) = if (clauses.isEmpty) true else clauses.map(_.filter(edge)).forall(identity) } case class Gt(propKey: String, value: String) extends Clause { - override def filter(edge: S2Edge): Boolean = binaryOp(_ > _)(propKey, value)(edge) + override def filter(edge: S2EdgeLike): Boolean = binaryOp(_ > _)(propKey, value)(edge) } case class Lt(propKey: String, value: String) extends Clause { - override def filter(edge: S2Edge): Boolean = binaryOp(_ < _)(propKey, value)(edge) + override def filter(edge: S2EdgeLike): Boolean = binaryOp(_ < _)(propKey, value)(edge) } case class Eq(propKey: String, value: String) extends Clause { - override def filter(edge: S2Edge): Boolean = binaryOp(_ == _)(propKey, value)(edge) + override def filter(edge: S2EdgeLike): Boolean = binaryOp(_ == _)(propKey, value)(edge) } case class InWithoutParent(label: Label, propKey: String, values: Set[String]) extends Clause { @@ -144,7 +144,7 @@ case class InWithoutParent(label: Label, propKey: String, values: Set[String]) e toInnerVal(value, dataType, label.schemaVersion) } - override def filter(edge: S2Edge): Boolean = { + override def filter(edge: S2EdgeLike): Boolean = { if (edge.dir == GraphUtil.directions("in")) { val propVal = propToInnerVal(edge, propKey) innerValLikeLsIn.contains(propVal) @@ -156,7 +156,7 @@ case class InWithoutParent(label: Label, propKey: String, values: Set[String]) e } case class IN(propKey: String, values: Set[String]) extends Clause { - override def filter(edge: S2Edge): Boolean = { + override def filter(edge: S2EdgeLike): Boolean = { val propVal = propToInnerVal(edge, propKey) values.exists { value => valueToCompare(edge, propKey, value) == propVal @@ -165,7 +165,7 @@ case class IN(propKey: String, values: Set[String]) extends Clause { } case class Between(propKey: String, minValue: String, maxValue: String) extends Clause { - override def filter(edge: S2Edge): Boolean = { + override def filter(edge: S2EdgeLike): Boolean = { val propVal = propToInnerVal(edge, propKey) val minVal = valueToCompare(edge, propKey, minValue) val maxVal = valueToCompare(edge, propKey, maxValue) @@ -175,15 +175,15 @@ case class Between(propKey: String, minValue: String, maxValue: String) extends } case class Not(self: Clause) extends Clause { - override def filter(edge: S2Edge) = !self.filter(edge) + override def filter(edge: S2EdgeLike) = !self.filter(edge) } case class And(left: Clause, right: Clause) extends Clause { - override def filter(edge: S2Edge) = left.filter(edge) && right.filter(edge) + override def filter(edge: S2EdgeLike) = left.filter(edge) && right.filter(edge) } case class Or(left: Clause, right: Clause) extends Clause { - override def filter(edge: S2Edge) = left.filter(edge) || right.filter(edge) + override def filter(edge: S2EdgeLike) = left.filter(edge) || right.filter(edge) } object WhereParser { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala index 55b6e12..6afbd87 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala @@ -548,12 +548,12 @@ class RequestParser(graph: S2Graph) { elementsWithTsv } - def parseJsonFormat(jsValue: JsValue, operation: String): Seq[(S2Edge, String)] = { + def parseJsonFormat(jsValue: JsValue, operation: String): Seq[(S2EdgeLike, String)] = { val jsValues = toJsValues(jsValue) jsValues.flatMap(toEdgeWithTsv(_, operation)) } - private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(S2Edge, String)] = { + private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(S2EdgeLike, String)] = { val srcIds = (jsValue \ "from").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "froms").asOpt[Seq[JsValue]].getOrElse(Nil) val tgtIds = (jsValue \ "to").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "tos").asOpt[Seq[JsValue]].getOrElse(Nil) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index 01dd128..e6075ec 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -93,13 +93,13 @@ abstract class Storage(val graph: S2Graph, def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = io.buildIncrementsCountAsync(indexedEdge, amount) - def buildVertexPutsAsync(edge: S2Edge): Seq[SKeyValue] = + def buildVertexPutsAsync(edge: S2EdgeLike): Seq[SKeyValue] = io.buildVertexPutsAsync(edge) def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = io.snapshotEdgeMutations(edgeMutate) - def buildDegreePuts(edge: S2Edge, degreeVal: Long): Seq[SKeyValue] = + def buildDegreePuts(edge: S2EdgeLike, degreeVal: Long): Seq[SKeyValue] = io.buildDegreePuts(edge, degreeVal) def buildPutsAll(vertex: S2VertexLike): Seq[SKeyValue] = @@ -121,15 +121,15 @@ abstract class Storage(val graph: S2Graph, def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = fetcher.fetchVertices(vertices) - def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]] = fetcher.fetchEdgesAll() + def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = fetcher.fetchEdgesAll() def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = fetcher.fetchVerticesAll() - def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(Option[S2Edge], Option[SKeyValue])] = + def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] = fetcher.fetchSnapshotEdgeInner(edge) /** Conflict Resolver **/ - def retry(tryNum: Int)(edges: Seq[S2Edge], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2Edge])(implicit ec: ExecutionContext): Future[Boolean] = + def retry(tryNum: Int)(edges: Seq[S2EdgeLike], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2EdgeLike])(implicit ec: ExecutionContext): Future[Boolean] = conflictResolver.retry(tryNum)(edges, statusCode, fetchedSnapshotEdgeOpt) /** Management **/ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala index 4014b6d..d0a59b2 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala @@ -32,8 +32,8 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) { /** Parsing Logic: parse from kv from Storage into Edge */ def toEdge[K: CanSKeyValue](kv: K, queryRequest: QueryRequest, - cacheElementOpt: Option[S2Edge], - parentEdges: Seq[EdgeWithScore]): Option[S2Edge] = { + cacheElementOpt: Option[S2EdgeLike], + parentEdges: Seq[EdgeWithScore]): Option[S2EdgeLike] = { logger.debug(s"toEdge: $kv") try { @@ -41,7 +41,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) { val queryParam = queryRequest.queryParam val schemaVer = queryParam.label.schemaVersion val indexEdgeOpt = serDe.indexEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), cacheElementOpt) - if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => indexEdge.copy(parentEdges = parentEdges)) + if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => indexEdge.copyParentEdges(parentEdges)) else indexEdgeOpt } catch { case ex: Exception => @@ -54,7 +54,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) { queryRequest: QueryRequest, cacheElementOpt: Option[SnapshotEdge] = None, isInnerCall: Boolean, - parentEdges: Seq[EdgeWithScore]): Option[S2Edge] = { + parentEdges: Seq[EdgeWithScore]): Option[S2EdgeLike] = { // logger.debug(s"SnapshottoEdge: $kv") val queryParam = queryRequest.queryParam val schemaVer = queryParam.label.schemaVersion @@ -62,7 +62,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) { if (isInnerCall) { snapshotEdgeOpt.flatMap { snapshotEdge => - val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges) + val edge = snapshotEdge.toEdge.copyParentEdges(parentEdges) if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge) else None } @@ -70,7 +70,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) { snapshotEdgeOpt.flatMap { snapshotEdge => if (snapshotEdge.allPropsDeleted) None else { - val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges) + val edge = snapshotEdge.toEdge.copyParentEdges(parentEdges) if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge) else None } @@ -144,7 +144,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) { } val tsVal = processTimeDecay(queryParam, edge) val newScore = degreeScore + score - EdgeWithScore(convertedEdge.copy(parentEdges = parentEdges), score = newScore * labelWeight * tsVal, label = label) + EdgeWithScore(convertedEdge.copyParentEdges(parentEdges), score = newScore * labelWeight * tsVal, label = label) } val sampled = @@ -229,11 +229,11 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) { } } - def buildVertexPutsAsync(edge: S2Edge): Seq[SKeyValue] = { + def buildVertexPutsAsync(edge: S2EdgeLike): Seq[SKeyValue] = { val storeVertex = edge.innerLabel.extraOptions.get("storeVertex").map(_.as[Boolean]).getOrElse(false) if (storeVertex) { - if (edge.op == GraphUtil.operations("delete")) + if (edge.getOp() == GraphUtil.operations("delete")) buildDeleteBelongsToId(edge.srcForVertex) ++ buildDeleteBelongsToId(edge.tgtForVertex) else serDe.vertexSerializer(edge.srcForVertex).toKeyValues ++ serDe.vertexSerializer(edge.tgtForVertex).toKeyValues @@ -242,7 +242,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) { } } - def buildDegreePuts(edge: S2Edge, degreeVal: Long): Seq[SKeyValue] = { + def buildDegreePuts(edge: S2EdgeLike, degreeVal: Long): Seq[SKeyValue] = { edge.propertyInner(LabelMeta.degree.name, degreeVal, edge.ts) val kvs = edge.edgesWithIndexValid.flatMap { indexEdge => serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala index c3b38e8..44bd4dc 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala @@ -39,16 +39,16 @@ trait StorageReadable { def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] - def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]] + def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] - protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2Edge)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] + protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] - def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(Option[S2Edge], Option[SKeyValue])] = { + def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] = { val queryParam = QueryParam(labelName = edge.innerLabel.label, direction = GraphUtil.fromDirection(edge.labelWithDir.dir), tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal), http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala index 227cfa7..854fc18 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala @@ -52,7 +52,7 @@ class WriteWriteConflictResolver(graph: S2Graph, Random.nextInt(Math.min(BackoffTimeout, slot * Math.pow(2, tryNum)).toInt) } - def retry(tryNum: Int)(edges: Seq[S2Edge], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2Edge])(implicit ec: ExecutionContext): Future[Boolean] = { + def retry(tryNum: Int)(edges: Seq[S2EdgeLike], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2EdgeLike])(implicit ec: ExecutionContext): Future[Boolean] = { if (tryNum >= MaxRetryNum) { edges.foreach { edge => logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}") @@ -112,9 +112,9 @@ class WriteWriteConflictResolver(graph: S2Graph, } } - protected def commitUpdate(edges: Seq[S2Edge], + protected def commitUpdate(edges: Seq[S2EdgeLike], statusCode: Byte, - fetchedSnapshotEdgeOpt: Option[S2Edge])(implicit ec: ExecutionContext): Future[Boolean] = { + fetchedSnapshotEdgeOpt: Option[S2EdgeLike])(implicit ec: ExecutionContext): Future[Boolean] = { // Future.failed(new PartialFailureException(edges.head, 0, "ahahah")) assert(edges.nonEmpty) // assert(statusCode == 0 || fetchedSnapshotEdgeOpt.isDefined) @@ -135,7 +135,7 @@ class WriteWriteConflictResolver(graph: S2Graph, assert(edgeMutate.newSnapshotEdge.isDefined) val lockTs = Option(System.currentTimeMillis()) - val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = squashedEdge.ts + 1) + val pendingEdge = squashedEdge.copyStatusCode(1).copyLockTs(lockTs).copyVersion(squashedEdge.ts + 1) val lockSnapshotEdge = squashedEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge)) val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0, pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1) @@ -158,7 +158,7 @@ class WriteWriteConflictResolver(graph: S2Graph, Future.successful(true) } else { val lockTs = Option(System.currentTimeMillis()) - val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = snapshotEdge.version + 1) + val pendingEdge = squashedEdge.copyStatusCode(1).copyLockTs(lockTs).copyVersion(snapshotEdge.getVersion() + 1) val lockSnapshotEdge = snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge)) val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0, pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1) @@ -182,7 +182,7 @@ class WriteWriteConflictResolver(graph: S2Graph, else S2Edge.buildOperation(fetchedSnapshotEdgeOpt, pendingEdge +: edges) val lockTs = Option(System.currentTimeMillis()) - val newPendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = snapshotEdge.version + 1) + val newPendingEdge = squashedEdge.copyStatusCode(1).copyLockTs(lockTs).copyVersion(snapshotEdge.getVersion() + 1) val lockSnapshotEdge = snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(newPendingEdge)) val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0, pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1) @@ -222,7 +222,7 @@ class WriteWriteConflictResolver(graph: S2Graph, if (fetchedSnapshotEdgeOpt.isDefined && fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.isDefined) fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.get +: edges else edges val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, _edges) - val newVersion = fetchedSnapshotEdgeOpt.map(_.version).getOrElse(squashedEdge.ts) + 2 + val newVersion = fetchedSnapshotEdgeOpt.map(_.getVersion()).getOrElse(squashedEdge.ts) + 2 val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge match { case None => squashedEdge.toSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion) case Some(newSnapshotEdge) => newSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion) @@ -246,8 +246,8 @@ class WriteWriteConflictResolver(graph: S2Graph, * @return */ protected def commitProcess(statusCode: Byte, - squashedEdge: S2Edge, - fetchedSnapshotEdgeOpt: Option[S2Edge], + squashedEdge: S2EdgeLike, + fetchedSnapshotEdgeOpt: Option[S2EdgeLike], lockSnapshotEdge: SnapshotEdge, releaseLockSnapshotEdge: SnapshotEdge, edgeMutate: EdgeMutate)(implicit ec: ExecutionContext): Future[Boolean] = { @@ -259,7 +259,7 @@ class WriteWriteConflictResolver(graph: S2Graph, } yield lockReleased } - case class PartialFailureException(edge: S2Edge, statusCode: Byte, failReason: String) extends NoStackException(failReason) + case class PartialFailureException(edge: S2EdgeLike, statusCode: Byte, failReason: String) extends NoStackException(failReason) protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) = { val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}").mkString("\n") @@ -282,8 +282,8 @@ class WriteWriteConflictResolver(graph: S2Graph, * @return */ protected def acquireLock(statusCode: Byte, - squashedEdge: S2Edge, - fetchedSnapshotEdgeOpt: Option[S2Edge], + squashedEdge: S2EdgeLike, + fetchedSnapshotEdgeOpt: Option[S2EdgeLike], lockEdge: SnapshotEdge)(implicit ec: ExecutionContext): Future[Boolean] = { if (statusCode >= 1) { logger.debug(s"skip acquireLock: [$statusCode]\n${squashedEdge.toLogString}") @@ -334,7 +334,7 @@ class WriteWriteConflictResolver(graph: S2Graph, */ protected def releaseLock(predicate: Boolean, statusCode: Byte, - squashedEdge: S2Edge, + squashedEdge: S2EdgeLike, releaseLockEdge: SnapshotEdge)(implicit ec: ExecutionContext): Future[Boolean] = { if (!predicate) { Future.failed(new PartialFailureException(squashedEdge, 3, "predicate failed.")) @@ -379,7 +379,7 @@ class WriteWriteConflictResolver(graph: S2Graph, */ protected def commitIndexEdgeMutations(predicate: Boolean, statusCode: Byte, - squashedEdge: S2Edge, + squashedEdge: S2EdgeLike, edgeMutate: EdgeMutate)(implicit ec: ExecutionContext): Future[Boolean] = { if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 1, "predicate failed.")) else { @@ -413,7 +413,7 @@ class WriteWriteConflictResolver(graph: S2Graph, */ protected def commitIndexEdgeDegreeMutations(predicate: Boolean, statusCode: Byte, - squashedEdge: S2Edge, + squashedEdge: S2EdgeLike, edgeMutate: EdgeMutate)(implicit ec: ExecutionContext): Future[Boolean] = { def _write(kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = { @@ -445,8 +445,8 @@ class WriteWriteConflictResolver(graph: S2Graph, /** end of methods for consistency */ - def mutateLog(snapshotEdgeOpt: Option[S2Edge], edges: Seq[S2Edge], - newEdge: S2Edge, edgeMutate: EdgeMutate) = + def mutateLog(snapshotEdgeOpt: Option[S2EdgeLike], edges: Seq[S2EdgeLike], + newEdge: S2EdgeLike, edgeMutate: EdgeMutate) = Seq("----------------------------------------------", s"SnapshotEdge: ${snapshotEdgeOpt.map(_.toLogString)}", s"requestEdges: ${edges.map(_.toLogString).mkString("\n")}", http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala index 92130f5..6be5f60 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala @@ -67,7 +67,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph, * @param queryRequest * @return */ - private def buildRequest(queryRequest: QueryRequest, edge: S2Edge) = { + private def buildRequest(queryRequest: QueryRequest, edge: S2EdgeLike) = { import Serializable._ val queryParam = queryRequest.queryParam val label = queryParam.label @@ -178,7 +178,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph, Left(get) } - override def fetchKeyValues(queryRequest: QueryRequest, edge: S2Edge)(implicit ec: ExecutionContext) = { + override def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext) = { val rpc = buildRequest(queryRequest, edge) fetchKeyValues(rpc) } @@ -221,7 +221,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph, } } - override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]] = { + override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = { val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) => val distinctLabels = labels.toSet val scan = AsynchbasePatcher.newScanner(client, hTableName) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala index 2501ed9..01f268b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala @@ -22,10 +22,9 @@ package org.apache.s2graph.core.storage.serde.indexedge.tall import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn} -import org.apache.s2graph.core.storage.serde._ -import org.apache.s2graph.core.storage.serde.StorageDeserializable._ -import org.apache.s2graph.core.storage.serde.Deserializable import org.apache.s2graph.core.storage.CanSKeyValue +import org.apache.s2graph.core.storage.serde.Deserializable +import org.apache.s2graph.core.storage.serde.StorageDeserializable._ import org.apache.s2graph.core.types._ object IndexEdgeDeserializable{ @@ -33,13 +32,13 @@ object IndexEdgeDeserializable{ } class IndexEdgeDeserializable(graph: S2Graph, bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong, - tallSchemaVersions: Set[String] = Set(HBaseType.VERSION4)) extends Deserializable[S2Edge] { + tallSchemaVersions: Set[String] = Set(HBaseType.VERSION4)) extends Deserializable[S2EdgeLike] { type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int) type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int) override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T], - cacheElementOpt: Option[S2Edge]): Option[S2Edge] = { + cacheElementOpt: Option[S2EdgeLike]): Option[S2EdgeLike] = { try { assert(_kvs.size == 1) @@ -79,8 +78,8 @@ class IndexEdgeDeserializable(graph: S2Graph, edge.propertyInner(LabelMeta.timestamp.name, version, version) edge.propertyInner(LabelMeta.degree.name, degreeVal, version) edge.tgtVertex = graph.newVertex(tgtVertexId, version) - edge.op = GraphUtil.defaultOpByte - edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer)) + edge.setOp(GraphUtil.defaultOpByte) + edge.setTsInnerValOpt(Option(InnerVal.withLong(tsVal, schemaVer))) } else { // not degree edge val (idxPropsRaw, endAt) = @@ -150,8 +149,8 @@ class IndexEdgeDeserializable(graph: S2Graph, edge.propertyInner(LabelMeta.timestamp.name, tsVal, version) edge.tgtVertex = graph.newVertex(tgtVertexId, version) - edge.op = op - edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer)) + edge.setOp(op) + edge.setTsInnerValOpt(Option(InnerVal.withLong(tsVal, schemaVer))) } Option(edge) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala index 68732ce..a7fe8a1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -28,13 +28,13 @@ import org.apache.s2graph.core.storage.serde.Deserializable import org.apache.s2graph.core.types._ class IndexEdgeDeserializable(graph: S2Graph, - bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[S2Edge] { + bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[S2EdgeLike] { type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int) type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int) override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T], - cacheElementOpt: Option[S2Edge]): Option[S2Edge] = { + cacheElementOpt: Option[S2EdgeLike]): Option[S2EdgeLike] = { try { assert(_kvs.size == 1) @@ -68,8 +68,8 @@ class IndexEdgeDeserializable(graph: S2Graph, edge.propertyInner(LabelMeta.timestamp.name, version, version) edge.propertyInner(LabelMeta.degree.name, degreeVal, version) edge.tgtVertex = graph.newVertex(tgtVertexId, version) - edge.op = GraphUtil.defaultOpByte - edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer)) + edge.setOp(GraphUtil.defaultOpByte) + edge.setTsInnerValOpt(Option(InnerVal.withLong(tsVal, schemaVer))) } else { pos = 0 val (idxPropsRaw, endAt) = bytesToProps(kv.qualifier, pos, schemaVer) @@ -123,8 +123,8 @@ class IndexEdgeDeserializable(graph: S2Graph, edge.propertyInner(LabelMeta.timestamp.name, tsVal, version) edge.tgtVertex = graph.newVertex(tgtVertexId, version) - edge.op = op - edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer)) + edge.setOp(op) + edge.setTsInnerValOpt(Option(InnerVal.withLong(tsVal, schemaVer))) } Option(edge) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala index 5f00b48..24775e7 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala @@ -52,7 +52,7 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[ snapshotEdge.pendingEdgeOpt match { case None => valueBytes() case Some(pendingEdge) => - val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) + val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.getOp()) val versionBytes = Array.empty[Byte] val propsBytes = pendingEdge.serializePropsWithTs() val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala index df84e86..44f2596 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala @@ -59,7 +59,7 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[ snapshotEdge.pendingEdgeOpt match { case None => valueBytes() case Some(pendingEdge) => - val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) + val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.getOp()) val versionBytes = Array.empty[Byte] val propsBytes = pendingEdge.serializePropsWithTs() val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala index ad9299c..55658b9 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala @@ -37,7 +37,7 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { val ts = System.currentTimeMillis() val dummyTs = LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion) - def validate(label: Label)(edge: S2Edge)(sql: String)(expected: Boolean) = { + def validate(label: Label)(edge: S2EdgeLike)(sql: String)(expected: Boolean) = { def debug(whereOpt: Try[Where]) = { println("==================") println(s"$whereOpt") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala index d8b2cfa..c7d474b 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala @@ -37,6 +37,7 @@ import scala.collection.JavaConverters._ object S2GraphProvider { val Implementation: Set[Class[_]] = Set( + classOf[S2EdgeLike], classOf[S2Edge], classOf[S2Vertex], classOf[S2VertexLike], http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala index 101b331..2b31fcd 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala @@ -54,7 +54,7 @@ object EdgeController extends Controller { graphElem match { case v: S2VertexLike => enqueue(kafkaTopic, graphElem, tsv) - case e: S2Edge => + case e: S2EdgeLike => e.innerLabel.extraOptions.get("walLog") match { case None => enqueue(kafkaTopic, e, tsv) @@ -93,8 +93,8 @@ object EdgeController extends Controller { val result = s2.mutateElements(elements.map(_._1), true) result onComplete { results => results.get.zip(elements).map { - case (r: MutateResponse, (e: S2Edge, tsv: String)) if !r.isSuccess => - val kafkaMessages = if(e.op == GraphUtil.operations("deleteAll")){ + case (r: MutateResponse, (e: S2EdgeLike, tsv: String)) if !r.isSuccess => + val kafkaMessages = if(e.getOp() == GraphUtil.operations("deleteAll")){ toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.innerLabel), e.labelWithDir.dir, e.ts) } else{ Seq(ExceptionHandler.toKafkaMessage(Config.KAFKA_MUTATE_FAIL_TOPIC, e, Some(tsv)))
