Repository: incubator-s2graph Updated Branches: refs/heads/master f74c224ac -> 247b2cb9d
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala index 2d34c7a..1a85dba 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala @@ -54,7 +54,7 @@ object RestHandler { * Public API, only return Future.successful or Future.failed * Don't throw exception */ -class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { +class RestHandler(graph: S2Graph)(implicit ec: ExecutionContext) { import RestHandler._ val requestParser = new RequestParser(graph) @@ -172,7 +172,7 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { } def getEdgesAsync(jsonQuery: JsValue, impIdOpt: Option[String] = None) - (post: (Graph, QueryOption, StepResult) => JsValue): Future[JsValue] = { + (post: (S2Graph, QueryOption, StepResult) => JsValue): Future[JsValue] = { def query(obj: JsValue): Future[JsValue] = { (obj \ "queries").asOpt[JsValue] match { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index efe7a3d..59b7518 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -39,10 +39,10 @@ import com.typesafe.config.{Config, ConfigFactory} import org.apache.hadoop.hbase.util.Bytes -abstract class Storage[Q, R](val graph: Graph, +abstract class Storage[Q, R](val graph: S2Graph, val config: Config)(implicit ec: ExecutionContext) { import HBaseType._ - import Graph._ + import S2Graph._ val BackoffTimeout = graph.BackoffTimeout val MaxRetryNum = graph.MaxRetryNum @@ -100,7 +100,7 @@ abstract class Storage[Q, R](val graph: Graph, * @param vertex: vertex to serialize * @return serializer implementation */ - def vertexSerializer(vertex: Vertex): Serializable[Vertex] = new VertexSerializable(vertex) + def vertexSerializer(vertex: S2Vertex): Serializable[S2Vertex] = new VertexSerializable(vertex) /** * create deserializer that can parse stored CanSKeyValue into snapshotEdge. @@ -122,7 +122,7 @@ abstract class Storage[Q, R](val graph: Graph, snapshotEdgeDeserializers.get(schemaVer).getOrElse(throw new RuntimeException(s"not supported version: ${schemaVer}")) /** create deserializer that can parse stored CanSKeyValue into indexEdge. */ - val indexEdgeDeserializers: Map[String, Deserializable[Edge]] = Map( + val indexEdgeDeserializers: Map[String, Deserializable[S2Edge]] = Map( VERSION1 -> new IndexEdgeDeserializable(graph), VERSION2 -> new IndexEdgeDeserializable(graph), VERSION3 -> new IndexEdgeDeserializable(graph), @@ -133,7 +133,7 @@ abstract class Storage[Q, R](val graph: Graph, indexEdgeDeserializers.get(schemaVer).getOrElse(throw new RuntimeException(s"not supported version: ${schemaVer}")) /** create deserializer that can parser stored CanSKeyValue into vertex. */ - val vertexDeserializer: Deserializable[Vertex] = new VertexDeserializable(graph) + val vertexDeserializer: Deserializable[S2Vertex] = new VertexDeserializable(graph) /** @@ -195,7 +195,7 @@ abstract class Storage[Q, R](val graph: Graph, * @param queryRequest * @return */ - protected def buildRequest(queryRequest: QueryRequest, edge: Edge): Q + protected def buildRequest(queryRequest: QueryRequest, edge: S2Edge): Q /** * fetch IndexEdges for given queryParam in queryRequest. @@ -242,7 +242,7 @@ abstract class Storage[Q, R](val graph: Graph, * @param withWait * @return */ - def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] + def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] /** * this method need to be called when client shutdown. this is responsible to cleanUp the resources @@ -276,9 +276,9 @@ abstract class Storage[Q, R](val graph: Graph, /** Public Interface */ - def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = { + def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = { def fromResult(kvs: Seq[SKeyValue], - version: String): Option[Vertex] = { + version: String): Option[S2Vertex] = { if (kvs.isEmpty) None else vertexDeserializer.fromKeyValues(None, kvs, version, None) // .map(S2Vertex(graph, _)) @@ -286,7 +286,7 @@ abstract class Storage[Q, R](val graph: Graph, val futures = vertices.map { vertex => val queryParam = QueryParam.Empty - val q = Query.toQuery(Seq(vertex), queryParam) + val q = Query.toQuery(Seq(vertex), Seq(queryParam)) val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam) fetchVertexKeyValues(queryRequest).map { kvs => fromResult(kvs, vertex.serviceColumn.schemaVersion) @@ -297,7 +297,7 @@ abstract class Storage[Q, R](val graph: Graph, Future.sequence(futures).map { result => result.toList.flatten } } - def mutateStrongEdges(_edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = { + def mutateStrongEdges(_edges: Seq[S2Edge], withWait: Boolean): Future[Seq[Boolean]] = { val edgeWithIdxs = _edges.zipWithIndex val grouped = edgeWithIdxs.groupBy { case (edge, idx) => @@ -332,7 +332,7 @@ abstract class Storage[Q, R](val graph: Graph, } } - def mutateVertex(vertex: Vertex, withWait: Boolean): Future[Boolean] = { + def mutateVertex(vertex: S2Vertex, withWait: Boolean): Future[Boolean] = { if (vertex.op == GraphUtil.operations("delete")) { writeToStorage(vertex.hbaseZkAddr, vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait) @@ -344,14 +344,14 @@ abstract class Storage[Q, R](val graph: Graph, } } - def mutateVertices(vertices: Seq[Vertex], + def mutateVertices(vertices: Seq[S2Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = { val futures = vertices.map { vertex => mutateVertex(vertex, withWait) } Future.sequence(futures) } - def mutateEdgesInner(edges: Seq[Edge], + def mutateEdgesInner(edges: Seq[S2Edge], checkConsistency: Boolean, withWait: Boolean): Future[Boolean] = { assert(edges.nonEmpty) @@ -360,7 +360,7 @@ abstract class Storage[Q, R](val graph: Graph, val zkQuorum = edges.head.innerLabel.hbaseZkAddr val futures = edges.map { edge => - val (_, edgeUpdate) = Edge.buildOperation(None, Seq(edge)) + val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge)) val mutations = indexedEdgeMutations(edgeUpdate) ++ snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate) @@ -382,7 +382,7 @@ abstract class Storage[Q, R](val graph: Graph, Random.nextInt(Math.min(BackoffTimeout, slot * Math.pow(2, tryNum)).toInt) } - def retry(tryNum: Int)(edges: Seq[Edge], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[Edge]): Future[Boolean] = { + def retry(tryNum: Int)(edges: Seq[S2Edge], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2Edge]): Future[Boolean] = { if (tryNum >= MaxRetryNum) { edges.foreach { edge => logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}") @@ -442,9 +442,9 @@ abstract class Storage[Q, R](val graph: Graph, } } - protected def commitUpdate(edges: Seq[Edge], + protected def commitUpdate(edges: Seq[S2Edge], statusCode: Byte, - fetchedSnapshotEdgeOpt: Option[Edge]): Future[Boolean] = { + fetchedSnapshotEdgeOpt: Option[S2Edge]): Future[Boolean] = { // Future.failed(new PartialFailureException(edges.head, 0, "ahahah")) assert(edges.nonEmpty) // assert(statusCode == 0 || fetchedSnapshotEdgeOpt.isDefined) @@ -460,7 +460,7 @@ abstract class Storage[Q, R](val graph: Graph, * lock = (squashedEdge, pendingE) * releaseLock = (edgeMutate.newSnapshotEdge, None) */ - val (squashedEdge, edgeMutate) = Edge.buildOperation(fetchedSnapshotEdgeOpt, edges) + val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges) assert(edgeMutate.newSnapshotEdge.isDefined) @@ -482,7 +482,7 @@ abstract class Storage[Q, R](val graph: Graph, * lock = (snapshotEdge, pendingE) * releaseLock = (edgeMutate.newSnapshotEdge, None) */ - val (squashedEdge, edgeMutate) = Edge.buildOperation(fetchedSnapshotEdgeOpt, edges) + val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges) if (edgeMutate.newSnapshotEdge.isEmpty) { logger.debug(s"drop this requests: \n${edges.map(_.toLogString).mkString("\n")}") Future.successful(true) @@ -508,8 +508,8 @@ abstract class Storage[Q, R](val graph: Graph, */ logger.debug(s"${pendingEdge.toLogString} has been expired.") val (squashedEdge, edgeMutate) = - if (pendingEdge.ts == snapshotEdge.ts) Edge.buildOperation(None, pendingEdge +: edges) - else Edge.buildOperation(fetchedSnapshotEdgeOpt, pendingEdge +: edges) + if (pendingEdge.ts == snapshotEdge.ts) S2Edge.buildOperation(None, pendingEdge +: edges) + else S2Edge.buildOperation(fetchedSnapshotEdgeOpt, pendingEdge +: edges) val lockTs = Option(System.currentTimeMillis()) val newPendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = snapshotEdge.version + 1) @@ -524,7 +524,7 @@ abstract class Storage[Q, R](val graph: Graph, * this can't be proceed so retry from re-fetch. * throw EX */ - val (squashedEdge, _) = Edge.buildOperation(fetchedSnapshotEdgeOpt, edges) + val (squashedEdge, _) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges) Future.failed(new PartialFailureException(squashedEdge, 0, s"others[${pendingEdge.ts}] is mutating. me[${squashedEdge.ts}]")) } } @@ -551,7 +551,7 @@ abstract class Storage[Q, R](val graph: Graph, val _edges = if (fetchedSnapshotEdgeOpt.isDefined && fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.isDefined) fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.get +: edges else edges - val (squashedEdge, edgeMutate) = Edge.buildOperation(fetchedSnapshotEdgeOpt, _edges) + val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, _edges) val newVersion = fetchedSnapshotEdgeOpt.map(_.version).getOrElse(squashedEdge.ts) + 2 val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge match { case None => squashedEdge.toSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion) @@ -575,8 +575,8 @@ abstract class Storage[Q, R](val graph: Graph, * @return */ protected def commitProcess(statusCode: Byte, - squashedEdge: Edge, - fetchedSnapshotEdgeOpt:Option[Edge], + squashedEdge: S2Edge, + fetchedSnapshotEdgeOpt:Option[S2Edge], lockSnapshotEdge: SnapshotEdge, releaseLockSnapshotEdge: SnapshotEdge, edgeMutate: EdgeMutate): Future[Boolean] = { @@ -588,7 +588,7 @@ abstract class Storage[Q, R](val graph: Graph, } yield lockReleased } - case class PartialFailureException(edge: Edge, statusCode: Byte, failReason: String) extends NoStackException(failReason) + case class PartialFailureException(edge: S2Edge, statusCode: Byte, failReason: String) extends NoStackException(failReason) protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) = { val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}").mkString("\n") @@ -611,8 +611,8 @@ abstract class Storage[Q, R](val graph: Graph, * @return */ protected def acquireLock(statusCode: Byte, - squashedEdge: Edge, - fetchedSnapshotEdgeOpt: Option[Edge], + squashedEdge: S2Edge, + fetchedSnapshotEdgeOpt: Option[S2Edge], lockEdge: SnapshotEdge): Future[Boolean] = { if (statusCode >= 1) { logger.debug(s"skip acquireLock: [$statusCode]\n${squashedEdge.toLogString}") @@ -663,7 +663,7 @@ abstract class Storage[Q, R](val graph: Graph, */ protected def releaseLock(predicate: Boolean, statusCode: Byte, - squashedEdge: Edge, + squashedEdge: S2Edge, releaseLockEdge: SnapshotEdge): Future[Boolean] = { if (!predicate) { Future.failed(new PartialFailureException(squashedEdge, 3, "predicate failed.")) @@ -708,7 +708,7 @@ abstract class Storage[Q, R](val graph: Graph, */ protected def commitIndexEdgeMutations(predicate: Boolean, statusCode: Byte, - squashedEdge: Edge, + squashedEdge: S2Edge, edgeMutate: EdgeMutate): Future[Boolean] = { if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 1, "predicate failed.")) else { @@ -742,7 +742,7 @@ abstract class Storage[Q, R](val graph: Graph, */ protected def commitIndexEdgeDegreeMutations(predicate: Boolean, statusCode: Byte, - squashedEdge: Edge, + squashedEdge: S2Edge, edgeMutate: EdgeMutate): Future[Boolean] = { def _write(kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = { @@ -772,8 +772,8 @@ abstract class Storage[Q, R](val graph: Graph, /** end of methods for consistency */ - def mutateLog(snapshotEdgeOpt: Option[Edge], edges: Seq[Edge], - newEdge: Edge, edgeMutate: EdgeMutate) = + def mutateLog(snapshotEdgeOpt: Option[S2Edge], edges: Seq[S2Edge], + newEdge: S2Edge, edgeMutate: EdgeMutate) = Seq("----------------------------------------------", s"SnapshotEdge: ${snapshotEdgeOpt.map(_.toLogString)}", s"requestEdges: ${edges.map(_.toLogString).mkString("\n")}", @@ -796,17 +796,17 @@ abstract class Storage[Q, R](val graph: Graph, val edge = edgeWithScore.edge val score = edgeWithScore.score - val edgeSnapshot = edge.copyEdge(propsWithTs = Edge.propsToState(edge.updatePropsWithTs())) + val edgeSnapshot = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) - val edgeForward = edge.copyEdge(propsWithTs = Edge.propsToState(edge.updatePropsWithTs())) + val edgeForward = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge => indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ buildIncrementsAsync(indexEdge, -1L) } /** reverted direction */ - val edgeRevert = edge.copyEdge(propsWithTs = Edge.propsToState(edge.updatePropsWithTs())) + val edgeRevert = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge => indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ buildIncrementsAsync(indexEdge, -1L) @@ -829,8 +829,8 @@ abstract class Storage[Q, R](val graph: Graph, /** Parsing Logic: parse from kv from Storage into Edge */ def toEdge[K: CanSKeyValue](kv: K, queryRequest: QueryRequest, - cacheElementOpt: Option[Edge], - parentEdges: Seq[EdgeWithScore]): Option[Edge] = { + cacheElementOpt: Option[S2Edge], + parentEdges: Seq[EdgeWithScore]): Option[S2Edge] = { logger.debug(s"toEdge: $kv") try { @@ -851,7 +851,7 @@ abstract class Storage[Q, R](val graph: Graph, queryRequest: QueryRequest, cacheElementOpt: Option[SnapshotEdge] = None, isInnerCall: Boolean, - parentEdges: Seq[EdgeWithScore]): Option[Edge] = { + parentEdges: Seq[EdgeWithScore]): Option[S2Edge] = { // logger.debug(s"SnapshottoEdge: $kv") val queryParam = queryRequest.queryParam val schemaVer = queryParam.label.schemaVersion @@ -959,7 +959,7 @@ abstract class Storage[Q, R](val graph: Graph, /** End Of Parse Logic */ - protected def toRequestEdge(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): Edge = { + protected def toRequestEdge(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): S2Edge = { val srcVertex = queryRequest.vertex val queryParam = queryRequest.queryParam val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt @@ -986,7 +986,7 @@ abstract class Storage[Q, R](val graph: Graph, } } - protected def fetchSnapshotEdgeInner(edge: Edge): Future[(QueryParam, Option[Edge], Option[SKeyValue])] = { + protected def fetchSnapshotEdgeInner(edge: S2Edge): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = { /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache * so use empty cacheKey. * */ @@ -994,7 +994,7 @@ abstract class Storage[Q, R](val graph: Graph, direction = GraphUtil.fromDirection(edge.labelWithDir.dir), tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal), cacheTTLInMillis = -1) - val q = Query.toQuery(Seq(edge.srcVertex), queryParam) + val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam)) val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam) // val q = Query.toQuery(Seq(edge.srcVertex), queryParam) @@ -1097,15 +1097,15 @@ abstract class Storage[Q, R](val graph: Graph, } //TODO: ServiceColumn do not have durability property yet. - def buildDeleteBelongsToId(vertex: Vertex): Seq[SKeyValue] = { + def buildDeleteBelongsToId(vertex: S2Vertex): Seq[SKeyValue] = { val kvs = vertexSerializer(vertex).toKeyValues val kv = kvs.head vertex.belongLabelIds.map { id => - kv.copy(qualifier = Bytes.toBytes(Vertex.toPropKey(id)), operation = SKeyValue.Delete) + kv.copy(qualifier = Bytes.toBytes(S2Vertex.toPropKey(id)), operation = SKeyValue.Delete) } } - def buildVertexPutsAsync(edge: Edge): Seq[SKeyValue] = { + def buildVertexPutsAsync(edge: S2Edge): Seq[SKeyValue] = { val storeVertex = edge.innerLabel.extraOptions.get("storeVertex").map(_.as[Boolean]).getOrElse(false) if (storeVertex) { @@ -1118,7 +1118,7 @@ abstract class Storage[Q, R](val graph: Graph, } } - def buildDegreePuts(edge: Edge, degreeVal: Long): Seq[SKeyValue] = { + def buildDegreePuts(edge: S2Edge, degreeVal: Long): Seq[SKeyValue] = { edge.property(LabelMeta.degree.name, degreeVal, edge.ts) val kvs = edge.edgesWithIndexValid.flatMap { indexEdge => indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability)) @@ -1127,7 +1127,7 @@ abstract class Storage[Q, R](val graph: Graph, kvs } - def buildPutsAll(vertex: Vertex): Seq[SKeyValue] = { + def buildPutsAll(vertex: S2Vertex): Seq[SKeyValue] = { vertex.op match { case d: Byte if d == GraphUtil.operations("delete") => vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) case _ => vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Put)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index b0287d5..93b2454 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -89,7 +89,7 @@ object AsynchbaseStorage { } -class AsynchbaseStorage(override val graph: Graph, +class AsynchbaseStorage(override val graph: S2Graph, override val config: Config)(implicit ec: ExecutionContext) extends Storage[AsyncRPC, Deferred[StepResult]](graph, config) { @@ -242,7 +242,7 @@ class AsynchbaseStorage(override val graph: Graph, * @param queryRequest * @return Scanner or GetRequest with proper setup with StartKey, EndKey, RangeFilter. */ - override def buildRequest(queryRequest: QueryRequest, edge: Edge): AsyncRPC = { + override def buildRequest(queryRequest: QueryRequest, edge: S2Edge): AsyncRPC = { import Serializable._ val queryParam = queryRequest.queryParam val label = queryParam.label @@ -424,7 +424,7 @@ class AsynchbaseStorage(override val graph: Graph, * @param withWait * @return */ - override def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = { + override def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = { val _client = client(withWait) val defers: Seq[Deferred[(Boolean, Long, Long)]] = for { @@ -517,9 +517,9 @@ class AsynchbaseStorage(override val graph: Graph, /** Asynchbase implementation override default getVertices to use future Cache */ - override def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = { + override def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = { def fromResult(kvs: Seq[SKeyValue], - version: String): Option[Vertex] = { + version: String): Option[S2Vertex] = { if (kvs.isEmpty) None else vertexDeserializer.fromKeyValues(None, kvs, version, None) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala index e11a5f6..5549f4e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala @@ -29,10 +29,10 @@ import org.apache.s2graph.core._ import scala.collection.immutable object IndexEdgeDeserializable{ - def getNewInstance(graph: Graph) = new IndexEdgeDeserializable(graph) + def getNewInstance(graph: S2Graph) = new IndexEdgeDeserializable(graph) } -class IndexEdgeDeserializable(graph: Graph, - bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[Edge] { +class IndexEdgeDeserializable(graph: S2Graph, + bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[S2Edge] { import StorageDeserializable._ type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int) @@ -41,7 +41,7 @@ class IndexEdgeDeserializable(graph: Graph, override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label], _kvs: Seq[T], schemaVer: String, - cacheElementOpt: Option[Edge]): Edge = { + cacheElementOpt: Option[S2Edge]): S2Edge = { assert(_kvs.size == 1) @@ -64,7 +64,7 @@ class IndexEdgeDeserializable(graph: Graph, val srcVertex = graph.newVertex(srcVertexId, version) //TODO: val edge = graph.newEdge(srcVertex, null, - label, labelWithDir.dir, GraphUtil.defaultOpByte, version, Edge.EmptyState) + label, labelWithDir.dir, GraphUtil.defaultOpByte, version, S2Edge.EmptyState) var tsVal = version if (pos == kv.row.length) { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala index 60f7d80..706d8cb 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -26,8 +26,8 @@ import org.apache.s2graph.core.types._ import org.apache.s2graph.core._ import scala.collection.immutable -class IndexEdgeDeserializable(graph: Graph, - bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[Edge] { +class IndexEdgeDeserializable(graph: S2Graph, + bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[S2Edge] { import StorageDeserializable._ type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int) @@ -68,7 +68,7 @@ class IndexEdgeDeserializable(graph: Graph, override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label], _kvs: Seq[T], schemaVer: String, - cacheElementOpt: Option[Edge]): Edge = { + cacheElementOpt: Option[S2Edge]): S2Edge = { assert(_kvs.size == 1) // val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } @@ -85,7 +85,7 @@ class IndexEdgeDeserializable(graph: Graph, val srcVertex = graph.newVertex(srcVertexId, version) //TODO: val edge = graph.newEdge(srcVertex, null, - label, labelWithDir.dir, GraphUtil.defaultOpByte, version, Edge.EmptyState) + label, labelWithDir.dir, GraphUtil.defaultOpByte, version, S2Edge.EmptyState) var tsVal = version val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala index 6c1906e..f4802c0 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala @@ -24,9 +24,9 @@ import org.apache.s2graph.core.mysqls.{ServiceColumn, Label, LabelIndex, LabelMe import org.apache.s2graph.core.storage.StorageDeserializable._ import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable} import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, SourceAndTargetVertexIdPair, SourceVertexId} -import org.apache.s2graph.core.{Graph, Edge, SnapshotEdge, Vertex} +import org.apache.s2graph.core.{S2Graph, S2Edge, SnapshotEdge, S2Vertex} -class SnapshotEdgeDeserializable(graph: Graph) extends Deserializable[SnapshotEdge] { +class SnapshotEdgeDeserializable(graph: S2Graph) extends Deserializable[SnapshotEdge] { def statusCodeWithOp(byte: Byte): (Byte, Byte) = { val statusCode = byte >> 4 http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala index 64b2e31..d3dec1e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala @@ -24,9 +24,9 @@ import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} import org.apache.s2graph.core.storage.StorageDeserializable._ import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, StorageDeserializable} import org.apache.s2graph.core.types.TargetVertexId -import org.apache.s2graph.core.{Graph, Edge, SnapshotEdge, Vertex} +import org.apache.s2graph.core.{S2Graph, S2Edge, SnapshotEdge, S2Vertex} -class SnapshotEdgeDeserializable(graph: Graph) extends Deserializable[SnapshotEdge] { +class SnapshotEdgeDeserializable(graph: S2Graph) extends Deserializable[SnapshotEdge] { def statusCodeWithOp(byte: Byte): (Byte, Byte) = { val statusCode = byte >> 4 http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala index 6e2311f..ee93505 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala @@ -19,20 +19,20 @@ package org.apache.s2graph.core.storage.serde.vertex -import org.apache.s2graph.core.mysqls.Label +import org.apache.s2graph.core.mysqls.{ColumnMeta, Label} import org.apache.s2graph.core.storage.StorageDeserializable._ import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable} import org.apache.s2graph.core.types.{InnerVal, InnerValLike, VertexId} -import org.apache.s2graph.core.{Graph, QueryParam, Vertex} +import org.apache.s2graph.core.{S2Graph, QueryParam, S2Vertex} import scala.collection.mutable.ListBuffer -class VertexDeserializable(graph: Graph, - bytesToInt: (Array[Byte], Int) => Int = bytesToInt) extends Deserializable[Vertex] { +class VertexDeserializable(graph: S2Graph, + bytesToInt: (Array[Byte], Int) => Int = bytesToInt) extends Deserializable[S2Vertex] { def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label], _kvs: Seq[T], version: String, - cacheElementOpt: Option[Vertex]): Vertex = { + cacheElementOpt: Option[S2Vertex]): S2Vertex = { val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } @@ -40,7 +40,7 @@ class VertexDeserializable(graph: Graph, val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version) var maxTs = Long.MinValue - val propsMap = new collection.mutable.HashMap[Int, InnerValLike] + val propsMap = new collection.mutable.HashMap[ColumnMeta, InnerValLike] val belongLabelIds = new ListBuffer[Int] for { @@ -53,15 +53,18 @@ class VertexDeserializable(graph: Graph, val ts = kv.timestamp if (ts > maxTs) maxTs = ts - if (Vertex.isLabelId(propKey)) { - belongLabelIds += Vertex.toLabelId(propKey) + if (S2Vertex.isLabelId(propKey)) { + belongLabelIds += S2Vertex.toLabelId(propKey) } else { val v = kv.value val (value, _) = InnerVal.fromBytes(v, 0, v.length, version) - propsMap += (propKey -> value) + val columnMeta = vertexId.column.metasMap(propKey) + propsMap += (columnMeta -> value) } } assert(maxTs != Long.MinValue) - graph.newVertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds) + val vertex = graph.newVertex(vertexId, maxTs, S2Vertex.EmptyProps, belongLabelIds = belongLabelIds) + S2Vertex.fillPropsWithTs(vertex, propsMap.toMap) + vertex } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala index 77bbb87..1dbcd00 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala @@ -19,12 +19,12 @@ package org.apache.s2graph.core.storage.serde.vertex -import org.apache.s2graph.core.Vertex +import org.apache.s2graph.core.S2Vertex import org.apache.s2graph.core.storage.StorageSerializable._ import org.apache.s2graph.core.storage.{SKeyValue, Serializable} -import org.apache.s2graph.core.utils.logger +import scala.collection.JavaConverters._ -case class VertexSerializable(vertex: Vertex, intToBytes: Int => Array[Byte] = intToBytes) extends Serializable[Vertex] { +case class VertexSerializable(vertex: S2Vertex, intToBytes: Int => Array[Byte] = intToBytes) extends Serializable[S2Vertex] { override val table = vertex.hbaseTableName.getBytes override val ts = vertex.ts @@ -38,8 +38,11 @@ case class VertexSerializable(vertex: Vertex, intToBytes: Int => Array[Byte] = i /** vertex override toKeyValues since vertex expect to produce multiple sKeyValues */ override def toKeyValues: Seq[SKeyValue] = { val row = toRowKey - val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield intToBytes(k) -> v.bytes - val belongsTo = vertex.belongLabelIds.map { labelId => intToBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] } + val base = for ((k, v) <- vertex.props.asScala ++ vertex.defaultProps.asScala) yield { + val columnMeta = v.columnMeta + intToBytes(columnMeta.seq) -> v.innerVal.bytes + } + val belongsTo = vertex.belongLabelIds.map { labelId => intToBytes(S2Vertex.toPropKey(labelId)) -> Array.empty[Byte] } (base ++ belongsTo).map { case (qualifier, value) => SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts) } toSeq http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala index d90cf8e..ea7aa41 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala @@ -268,6 +268,23 @@ trait CanInnerValLike[A] { object CanInnerValLike { implicit val encodingVer = "v2" + def castValue(element: Any, classType: String): Any = { + import InnerVal._ + element match { + case bd: BigDecimal => + classType match { + case DOUBLE => bd.doubleValue() + case FLOAT => bd.floatValue() + case LONG => bd.longValue() + case INT | "int" => bd.intValue() + case SHORT => bd.shortValue() + case BYTE => bd.byteValue() + case _ => throw new RuntimeException(s"not supported data type: $element, $classType") + } + case _ => element +// throw new RuntimeException(s"not supported data type: $element, ${element.getClass.getCanonicalName}, $classType") + } + } def validate(element: Any, classType: String): Boolean = { import InnerVal._ classType match { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala index a949f3e..eb2d42a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala @@ -51,7 +51,7 @@ object VertexId extends HBaseDeserializable { } } -class VertexId protected (val column: ServiceColumn, val innerId: InnerValLike) extends HBaseSerializable { +class VertexId (val column: ServiceColumn, val innerId: InnerValLike) extends HBaseSerializable { val storeHash: Boolean = true val storeColId: Boolean = true val colId = column.id.get http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala deleted file mode 100644 index 55b796d..0000000 --- a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core - -import org.apache.s2graph.core.JSONParser._ -import org.apache.s2graph.core.mysqls.{ServiceColumn, LabelMeta} -import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId} -import org.apache.s2graph.core.utils.logger -import org.scalatest.FunSuite -import play.api.libs.json.{JsObject, Json} - -class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { - import Edge._ - initTests() - - val testLabelMeta1 = LabelMeta(Option(-1), labelV2.id.get, "is_blocked", 1.toByte, "true", "boolean") - val testLabelMeta3 = LabelMeta(Option(-1), labelV2.id.get, "time", 3.toByte, "-1", "long") - -// test("toLogString") { -// val testServiceName = serviceNameV2 -// val testLabelName = labelNameV2 -// val bulkQueries = List( -// ("1445240543366", "update", "{\"is_blocked\":true}"), -// ("1445240543362", "insert", "{\"is_hidden\":false}"), -// ("1445240543364", "insert", "{\"is_hidden\":false,\"weight\":10}"), -// ("1445240543363", "delete", "{}"), -// ("1445240543365", "update", "{\"time\":1, \"weight\":-10}")) -// -// val (srcId, tgtId, labelName) = ("1", "2", testLabelName) -// -// val bulkEdge = (for ((ts, op, props) <- bulkQueries) yield { -// val properties = fromJsonToProperties(Json.parse(props).as[JsObject]) -// Edge.toEdge(srcId, tgtId, labelName, "out", properties, ts.toLong, op).toLogString -// }).mkString("\n") -// -// val attachedProps = "\"from\":\"1\",\"to\":\"2\",\"label\":\"" + testLabelName + -// "\",\"service\":\"" + testServiceName + "\"" -// val expected = Seq( -// Seq("1445240543366", "update", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"is_blocked\":true}"), -// Seq("1445240543362", "insert", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"is_hidden\":false}"), -// Seq("1445240543364", "insert", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"is_hidden\":false,\"weight\":10}"), -// Seq("1445240543363", "delete", "e", "1", "2", testLabelName, "{" + attachedProps + "}"), -// Seq("1445240543365", "update", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"time\":1,\"weight\":-10}") -// ).map(_.mkString("\t")).mkString("\n") -// -// assert(bulkEdge === expected) -// } - - test("buildOperation") { - val schemaVersion = "v2" - val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion)) - val srcVertex = graph.newVertex(vertexId) - val tgtVertex = srcVertex - - val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) - - val snapshotEdge = None - val propsWithTs = Map(timestampProp) - val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) - - val newVersion = 0L - - val newPropsWithTs = Map( - timestampProp, - testLabelMeta1 -> InnerValLikeWithTs(InnerVal.withBoolean(false, schemaVersion), 1) - ) - - val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, propsWithTs, newPropsWithTs) - logger.info(edgeMutate.toLogString) - - assert(edgeMutate.newSnapshotEdge.isDefined) - assert(edgeMutate.edgesToInsert.nonEmpty) - assert(edgeMutate.edgesToDelete.isEmpty) - } - - test("buildMutation: snapshotEdge: None with newProps") { - val schemaVersion = "v2" - val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion)) - val srcVertex = graph.newVertex(vertexId) - val tgtVertex = srcVertex - - val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) - - val snapshotEdge = None - val propsWithTs = Map(timestampProp) - val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) - - val newVersion = 0L - - val newPropsWithTs = Map( - timestampProp, - testLabelMeta1 -> InnerValLikeWithTs(InnerVal.withBoolean(false, schemaVersion), 1) - ) - - val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, propsWithTs, newPropsWithTs) - logger.info(edgeMutate.toLogString) - - assert(edgeMutate.newSnapshotEdge.isDefined) - assert(edgeMutate.edgesToInsert.nonEmpty) - assert(edgeMutate.edgesToDelete.isEmpty) - } - - test("buildMutation: oldPropsWithTs == newPropsWithTs, Drop all requests") { - val schemaVersion = "v2" - val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion)) - val srcVertex = graph.newVertex(vertexId) - val tgtVertex = srcVertex - - val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) - - val snapshotEdge = None - val propsWithTs = Map(timestampProp) - val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) - - val newVersion = 0L - - val newPropsWithTs = propsWithTs - - val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, propsWithTs, newPropsWithTs) - logger.info(edgeMutate.toLogString) - - assert(edgeMutate.newSnapshotEdge.isEmpty) - assert(edgeMutate.edgesToInsert.isEmpty) - assert(edgeMutate.edgesToDelete.isEmpty) - } - - test("buildMutation: All props older than snapshotEdge's LastDeletedAt") { - val schemaVersion = "v2" - val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion)) - val srcVertex = graph.newVertex(vertexId) - val tgtVertex = srcVertex - - val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) - val oldPropsWithTs = Map( - timestampProp, - LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) - ) - - val propsWithTs = Map( - timestampProp, - testLabelMeta3 -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 2), - LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) - ) - - val _snapshotEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = propsWithTs) - - val snapshotEdge = Option(_snapshotEdge) - - - val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) - - val newVersion = 0L - val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) - logger.info(edgeMutate.toLogString) - - assert(edgeMutate.newSnapshotEdge.nonEmpty) - assert(edgeMutate.edgesToInsert.isEmpty) - assert(edgeMutate.edgesToDelete.isEmpty) - } - - test("buildMutation: All props newer than snapshotEdge's LastDeletedAt") { - val schemaVersion = "v2" - val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion)) - val srcVertex = graph.newVertex(vertexId) - val tgtVertex = srcVertex - - val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) - val oldPropsWithTs = Map( - timestampProp, - LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) - ) - - val propsWithTs = Map( - timestampProp, - testLabelMeta3 -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 4), - LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) - ) - - val _snapshotEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = propsWithTs) - - val snapshotEdge = Option(_snapshotEdge) - - val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) - - val newVersion = 0L - val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) - logger.info(edgeMutate.toLogString) - - assert(edgeMutate.newSnapshotEdge.nonEmpty) - assert(edgeMutate.edgesToInsert.nonEmpty) - assert(edgeMutate.edgesToDelete.isEmpty) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala index cc08b62..d280570 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala @@ -34,14 +34,14 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { import TestUtil._ - var graph: Graph = _ + var graph: S2Graph = _ var parser: RequestParser = _ var management: Management = _ var config: Config = _ override def beforeAll = { config = ConfigFactory.load() - graph = new Graph(config)(ExecutionContext.Implicits.global) + graph = new S2Graph(config)(ExecutionContext.Implicits.global) management = new Management(graph) parser = new RequestParser(graph) initTestData() @@ -92,7 +92,7 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { } } - val vertexPropsKeys = List("age" -> "int", "im" -> "string") + val vertexPropsKeys = List("age" -> "integer", "im" -> "string") vertexPropsKeys.map { case (key, keyType) => Management.addVertexProp(testServiceName, testColumnName, key, keyType) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/test/scala/org/apache/s2graph/core/Integrate/tinkerpop/S2S2GraphTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/tinkerpop/S2S2GraphTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/tinkerpop/S2S2GraphTest.scala new file mode 100644 index 0000000..3eb04ff --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/tinkerpop/S2S2GraphTest.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.Integrate.tinkerpop + +import org.apache.s2graph.core.mysqls.Label +import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core.{S2Graph, TestCommonWithModels, S2Vertex} +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal +import org.apache.tinkerpop.gremlin.structure.{Edge, Vertex, T} +import org.scalatest.{FunSuite, Matchers} + + +class S2GraphTest extends FunSuite with Matchers with TestCommonWithModels { + + import scala.collection.JavaConversions._ + import scala.concurrent.ExecutionContext.Implicits.global + + initTests() + + val g = new S2Graph(config) + + def printEdges(edges: Seq[Edge]): Unit = { + edges.foreach { edge => + logger.debug(s"[FetchedEdge]: $edge") + } + } + + import scala.language.implicitConversions + + def newVertexId(id: Any, label: Label = labelV2) = g.newVertexId(label.srcService, label.srcColumn, id) + + def addVertex(id: AnyRef, label: Label = labelV2) = + g.addVertex(T.label, label.srcService.serviceName + S2Vertex.VertexLabelDelimiter + label.srcColumnName, + T.id, id).asInstanceOf[S2Vertex] + + val srcId = Long.box(20) + val range = (100 until 110) + testData(srcId, range) + + // val testProps = Seq( + // Prop("affinity_score", "0.0", DOUBLE), + // Prop("is_blocked", "false", BOOLEAN), + // Prop("time", "0", INT), + // Prop("weight", "0", INT), + // Prop("is_hidden", "true", BOOLEAN), + // Prop("phone_number", "xxx-xxx-xxxx", STRING), + // Prop("score", "0.1", FLOAT), + // Prop("age", "10", INT) + // ) + def testData(srcId: AnyRef, range: Range, label: Label = labelV2) = { + val src = addVertex(srcId) + + for { + i <- range + } { + val tgt = addVertex(Int.box(i)) + + src.addEdge(labelV2.label, tgt, + "age", Int.box(10), + "affinity_score", Double.box(0.1), + "is_blocked", Boolean.box(true), + "ts", Long.box(i)) + } + } + + test("test traversal.") { + val vertices = g.traversal().V(newVertexId(srcId)).out(labelV2.label).toSeq + + vertices.size should be(range.size) + range.reverse.zip(vertices).foreach { case (tgtId, vertex) => + val vertexId = g.newVertexId(labelV2.tgtService, labelV2.tgtColumn, tgtId) + val expectedId = g.newVertex(vertexId) + vertex.asInstanceOf[S2Vertex].innerId should be(expectedId.innerId) + } + } + + test("test traversal. limit 1") { + val vertexIdParams = Seq(newVertexId(srcId)) + val t: GraphTraversal[Vertex, Double] = g.traversal().V(vertexIdParams: _*).outE(labelV2.label).limit(1).values("affinity_score") + for { + affinityScore <- t + } { + logger.debug(s"$affinityScore") + affinityScore should be (0.1) + } + } + test("test traversal. 3") { + + val l = label + + val srcA = addVertex(Long.box(1), l) + val srcB = addVertex(Long.box(2), l) + val srcC = addVertex(Long.box(3), l) + + val tgtA = addVertex(Long.box(101), l) + val tgtC = addVertex(Long.box(103), l) + + srcA.addEdge(l.label, tgtA) + srcA.addEdge(l.label, tgtC) + tgtC.addEdge(l.label, srcB) + tgtA.addEdge(l.label, srcC) + + val vertexIdParams = Seq(srcA.id) + val vertices = g.traversal().V(vertexIdParams: _*).out(l.label).out(l.label).toSeq + vertices.size should be(2) + vertices.foreach { v => + val vertex = v.asInstanceOf[S2Vertex] + // TODO: we have too many id. this is ugly and confusing so fix me. + vertex.id.innerId == srcB.id.innerId || vertex.id.innerId == srcC.id.innerId should be(true) + logger.debug(s"[Vertex]: $v") + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala new file mode 100644 index 0000000..94883c9 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core + +import org.apache.s2graph.core.JSONParser._ +import org.apache.s2graph.core.mysqls.{ServiceColumn, LabelMeta} +import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId} +import org.apache.s2graph.core.utils.logger +import org.scalatest.FunSuite +import play.api.libs.json.{JsObject, Json} + +class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { + import S2Edge._ + initTests() + + val testLabelMeta1 = LabelMeta(Option(-1), labelV2.id.get, "is_blocked", 1.toByte, "true", "boolean") + val testLabelMeta3 = LabelMeta(Option(-1), labelV2.id.get, "time", 3.toByte, "-1", "long") + +// test("toLogString") { +// val testServiceName = serviceNameV2 +// val testLabelName = labelNameV2 +// val bulkQueries = List( +// ("1445240543366", "update", "{\"is_blocked\":true}"), +// ("1445240543362", "insert", "{\"is_hidden\":false}"), +// ("1445240543364", "insert", "{\"is_hidden\":false,\"weight\":10}"), +// ("1445240543363", "delete", "{}"), +// ("1445240543365", "update", "{\"time\":1, \"weight\":-10}")) +// +// val (srcId, tgtId, labelName) = ("1", "2", testLabelName) +// +// val bulkEdge = (for ((ts, op, props) <- bulkQueries) yield { +// val properties = fromJsonToProperties(Json.parse(props).as[JsObject]) +// Edge.toEdge(srcId, tgtId, labelName, "out", properties, ts.toLong, op).toLogString +// }).mkString("\n") +// +// val attachedProps = "\"from\":\"1\",\"to\":\"2\",\"label\":\"" + testLabelName + +// "\",\"service\":\"" + testServiceName + "\"" +// val expected = Seq( +// Seq("1445240543366", "update", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"is_blocked\":true}"), +// Seq("1445240543362", "insert", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"is_hidden\":false}"), +// Seq("1445240543364", "insert", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"is_hidden\":false,\"weight\":10}"), +// Seq("1445240543363", "delete", "e", "1", "2", testLabelName, "{" + attachedProps + "}"), +// Seq("1445240543365", "update", "e", "1", "2", testLabelName, "{" + attachedProps + ",\"time\":1,\"weight\":-10}") +// ).map(_.mkString("\t")).mkString("\n") +// +// assert(bulkEdge === expected) +// } + + test("buildOperation") { + val schemaVersion = "v2" + val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion)) + val srcVertex = graph.newVertex(vertexId) + val tgtVertex = srcVertex + + val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) + + val snapshotEdge = None + val propsWithTs = Map(timestampProp) + val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) + + val newVersion = 0L + + val newPropsWithTs = Map( + timestampProp, + testLabelMeta1 -> InnerValLikeWithTs(InnerVal.withBoolean(false, schemaVersion), 1) + ) + + val edgeMutate = S2Edge.buildMutation(snapshotEdge, requestEdge, newVersion, propsWithTs, newPropsWithTs) + logger.info(edgeMutate.toLogString) + + assert(edgeMutate.newSnapshotEdge.isDefined) + assert(edgeMutate.edgesToInsert.nonEmpty) + assert(edgeMutate.edgesToDelete.isEmpty) + } + + test("buildMutation: snapshotEdge: None with newProps") { + val schemaVersion = "v2" + val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion)) + val srcVertex = graph.newVertex(vertexId) + val tgtVertex = srcVertex + + val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) + + val snapshotEdge = None + val propsWithTs = Map(timestampProp) + val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) + + val newVersion = 0L + + val newPropsWithTs = Map( + timestampProp, + testLabelMeta1 -> InnerValLikeWithTs(InnerVal.withBoolean(false, schemaVersion), 1) + ) + + val edgeMutate = S2Edge.buildMutation(snapshotEdge, requestEdge, newVersion, propsWithTs, newPropsWithTs) + logger.info(edgeMutate.toLogString) + + assert(edgeMutate.newSnapshotEdge.isDefined) + assert(edgeMutate.edgesToInsert.nonEmpty) + assert(edgeMutate.edgesToDelete.isEmpty) + } + + test("buildMutation: oldPropsWithTs == newPropsWithTs, Drop all requests") { + val schemaVersion = "v2" + val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion)) + val srcVertex = graph.newVertex(vertexId) + val tgtVertex = srcVertex + + val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) + + val snapshotEdge = None + val propsWithTs = Map(timestampProp) + val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) + + val newVersion = 0L + + val newPropsWithTs = propsWithTs + + val edgeMutate = S2Edge.buildMutation(snapshotEdge, requestEdge, newVersion, propsWithTs, newPropsWithTs) + logger.info(edgeMutate.toLogString) + + assert(edgeMutate.newSnapshotEdge.isEmpty) + assert(edgeMutate.edgesToInsert.isEmpty) + assert(edgeMutate.edgesToDelete.isEmpty) + } + + test("buildMutation: All props older than snapshotEdge's LastDeletedAt") { + val schemaVersion = "v2" + val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion)) + val srcVertex = graph.newVertex(vertexId) + val tgtVertex = srcVertex + + val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) + val oldPropsWithTs = Map( + timestampProp, + LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) + ) + + val propsWithTs = Map( + timestampProp, + testLabelMeta3 -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 2), + LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) + ) + + val _snapshotEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = propsWithTs) + + val snapshotEdge = Option(_snapshotEdge) + + + val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) + + val newVersion = 0L + val edgeMutate = S2Edge.buildMutation(snapshotEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) + logger.info(edgeMutate.toLogString) + + assert(edgeMutate.newSnapshotEdge.nonEmpty) + assert(edgeMutate.edgesToInsert.isEmpty) + assert(edgeMutate.edgesToDelete.isEmpty) + } + + test("buildMutation: All props newer than snapshotEdge's LastDeletedAt") { + val schemaVersion = "v2" + val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion)) + val srcVertex = graph.newVertex(vertexId) + val tgtVertex = srcVertex + + val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) + val oldPropsWithTs = Map( + timestampProp, + LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) + ) + + val propsWithTs = Map( + timestampProp, + testLabelMeta3 -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 4), + LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) + ) + + val _snapshotEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = propsWithTs) + + val snapshotEdge = Option(_snapshotEdge) + + val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) + + val newVersion = 0L + val edgeMutate = S2Edge.buildMutation(snapshotEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) + logger.info(edgeMutate.toLogString) + + assert(edgeMutate.newSnapshotEdge.nonEmpty) + assert(edgeMutate.edgesToInsert.nonEmpty) + assert(edgeMutate.edgesToDelete.isEmpty) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala index 12eae77..1997354 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala @@ -32,13 +32,13 @@ trait TestCommonWithModels { import InnerVal._ import types.HBaseType._ - var graph: Graph = _ + var graph: S2Graph = _ var config: Config = _ var management: Management = _ def initTests() = { config = ConfigFactory.load() - graph = new Graph(config)(ExecutionContext.Implicits.global) + graph = new S2Graph(config)(ExecutionContext.Implicits.global) management = new Management(graph) implicit val session = AutoSession http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/test/scala/org/apache/s2graph/core/benchmark/BenchmarkCommon.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/BenchmarkCommon.scala b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/BenchmarkCommon.scala index 9e220cf..30011b1 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/BenchmarkCommon.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/BenchmarkCommon.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core.benchmark import com.typesafe.config.{ConfigFactory, Config} -import org.apache.s2graph.core.{Management, Graph} +import org.apache.s2graph.core.{Management, S2Graph$} import org.specs2.mutable.Specification import scalikejdbc.AutoSession http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala index cb6090e..25dd0e4 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala @@ -37,7 +37,7 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { val ts = System.currentTimeMillis() val dummyTs = LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion) - def validate(label: Label)(edge: Edge)(sql: String)(expected: Boolean) = { + def validate(label: Label)(edge: S2Edge)(sql: String)(expected: Boolean) = { def debug(whereOpt: Try[Where]) = { println("==================") println(s"$whereOpt") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala index e530e65..17ecc87 100644 --- a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala @@ -21,7 +21,7 @@ package org.apache.s2graph.counter.helper import com.typesafe.config.Config import org.apache -import org.apache.s2graph.core.Graph +import org.apache.s2graph.core.S2Graph import org.apache.s2graph.core.mysqls.Label import org.apache.s2graph.counter import org.apache.s2graph.counter.config.S2CounterConfig @@ -37,7 +37,7 @@ class CounterAdmin(config: Config) { val s2config = new S2CounterConfig(config) val counterModel = new CounterModel(config) val graphOp = new GraphOperation(config) - val s2graph = new Graph(config)(scala.concurrent.ExecutionContext.global) + val s2graph = new S2Graph(config)(scala.concurrent.ExecutionContext.global) val storageManagement = new org.apache.s2graph.core.Management(s2graph) def setupCounterOnGraph(): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala b/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala index c1d25f3..62174ad 100644 --- a/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala +++ b/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala @@ -21,7 +21,7 @@ package org.apache.s2graph.counter.core import com.typesafe.config.ConfigFactory import org.apache.s2graph.core.mysqls.Label -import org.apache.s2graph.core.{Graph, Management} +import org.apache.s2graph.core.{S2Graph$, Management} import org.apache.s2graph.counter.config.S2CounterConfig import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit import org.apache.s2graph.counter.core.v2.{GraphOperation, RankingStorageGraph} @@ -87,7 +87,7 @@ class RankingCounterSpec extends Specification with BeforeAfterAll { } val graphOp = new GraphOperation(config) - val graph = new Graph(config)(scala.concurrent.ExecutionContext.global) + val graph = new S2Graph(config)(scala.concurrent.ExecutionContext.global) val management = new Management(graph) management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"${service}_dev", 1, None, "gz") val strJs = http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala index a659ed3..37779dc 100644 --- a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala +++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala @@ -19,7 +19,7 @@ package org.apache.s2graph.counter.loader.core -import org.apache.s2graph.core.{Edge, Graph, GraphUtil} +import org.apache.s2graph.core.{S2Edge, S2Graph, GraphUtil} import org.apache.s2graph.counter.loader.config.StreamingConfig import org.apache.s2graph.counter.models.CounterModel import org.apache.s2graph.spark.config.S2ConfigFactory @@ -32,12 +32,12 @@ object CounterEtlFunctions extends Logging { lazy val preFetchSize = StreamingConfig.PROFILE_PREFETCH_SIZE lazy val config = S2ConfigFactory.config lazy val counterModel = new CounterModel(config) - lazy val graph = new Graph(config)(scala.concurrent.ExecutionContext.Implicits.global) + lazy val graph = new S2Graph(config)(scala.concurrent.ExecutionContext.Implicits.global) - def logToEdge(line: String): Option[Edge] = { + def logToEdge(line: String): Option[S2Edge] = { for { - elem <- graph.toGraphElement(line) if elem.isInstanceOf[Edge] - edge <- Some(elem.asInstanceOf[Edge]).filter { x => + elem <- graph.toGraphElement(line) if elem.isInstanceOf[S2Edge] + edge <- Some(elem.asInstanceOf[S2Edge]).filter { x => filterOps.contains(x.op) } } yield { @@ -50,8 +50,8 @@ object CounterEtlFunctions extends Logging { * 1427082276804 insert edge 19073318 52453027_93524145648511699 story_user_ch_doc_view {"doc_type" : "l", "channel_subscribing" : "y", "view_from" : "feed"} */ for { - elem <- graph.toGraphElement(line) if elem.isInstanceOf[Edge] - edge <- Some(elem.asInstanceOf[Edge]).filter { x => + elem <- graph.toGraphElement(line) if elem.isInstanceOf[S2Edge] + edge <- Some(elem.asInstanceOf[S2Edge]).filter { x => filterOps.contains(x.op) } } yield { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala index 8ae1de3..6985758 100644 --- a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala +++ b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala @@ -22,7 +22,7 @@ package org.apache.s2graph.counter.loader.core import com.typesafe.config.ConfigFactory import org.apache.s2graph.core.mysqls.{Label, Service} import org.apache.s2graph.core.types.HBaseType -import org.apache.s2graph.core.{Graph, Management} +import org.apache.s2graph.core.{S2Graph$, Management} import org.apache.s2graph.counter.models.DBModel import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} @@ -33,7 +33,7 @@ class CounterEtlFunctionsSpec extends FlatSpec with BeforeAndAfterAll with Match val cluster = config.getString("hbase.zookeeper.quorum") DBModel.initialize(config) - val graph = new Graph(config)(global) + val graph = new S2Graph(config)(global) val management = new Management(graph) override def beforeAll: Unit = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala ---------------------------------------------------------------------- diff --git a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala index a6f8f5c..a47fda7 100644 --- a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala +++ b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala @@ -39,7 +39,7 @@ import org.apache.s2graph.core.rest.RestHandler import org.apache.s2graph.core.rest.RestHandler.{CanLookup, HandlerResult} import org.apache.s2graph.core.utils.Extensions._ import org.apache.s2graph.core.utils.logger -import org.apache.s2graph.core.{Graph, PostProcess} +import org.apache.s2graph.core.{S2Graph, PostProcess} import play.api.libs.json._ import scala.collection.mutable @@ -217,7 +217,7 @@ object NettyServer extends App { val maxBodySize = Try(config.getInt("max.body.size")).recover { case _ => 65536 * 2 }.get // init s2graph with config - val s2graph = new Graph(config)(ec) + val s2graph = new S2Graph(config)(ec) val rest = new RestHandler(s2graph)(ec) val deployInfo = Try(Source.fromFile("./release_info").mkString("")).recover { case _ => "release info not found\n" }.get http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala index 692ab1e..e5fc75d 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala @@ -23,7 +23,7 @@ import java.util.concurrent.Executors import org.apache.s2graph.core.rest.{RequestParser, RestHandler} import org.apache.s2graph.core.utils.logger -import org.apache.s2graph.core.{ExceptionHandler, Graph, Management} +import org.apache.s2graph.core.{ExceptionHandler, S2Graph, Management} import org.apache.s2graph.rest.play.actors.QueueActor import org.apache.s2graph.rest.play.config.Config import org.apache.s2graph.rest.play.controllers.ApplicationController @@ -36,7 +36,7 @@ import scala.io.Source import scala.util.Try object Global extends WithFilters(new GzipFilter()) { - var s2graph: Graph = _ + var s2graph: S2Graph = _ var storageManagement: Management = _ var s2parser: RequestParser = _ var s2rest: RestHandler = _ @@ -50,7 +50,7 @@ object Global extends WithFilters(new GzipFilter()) { val config = Config.conf.underlying // init s2graph with config - s2graph = new Graph(config)(ec) + s2graph = new S2Graph(config)(ec) storageManagement = new Management(s2graph) s2parser = new RequestParser(s2graph) s2rest = new RestHandler(s2graph)(ec) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala b/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala index d46d8d2..75ffbc3 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit import akka.actor._ import org.apache.s2graph.core.ExceptionHandler._ import org.apache.s2graph.core.utils.logger -import org.apache.s2graph.core.{ExceptionHandler, Graph, GraphElement} +import org.apache.s2graph.core.{ExceptionHandler, S2Graph, GraphElement} import org.apache.s2graph.rest.play.actors.Protocol.FlushAll import org.apache.s2graph.rest.play.config.Config import play.api.Play.current @@ -46,7 +46,7 @@ object QueueActor { var router: ActorRef = _ // Akka.system.actorOf(props(), name = "queueActor") - def init(s2: Graph, walLogHandler: ExceptionHandler) = { + def init(s2: S2Graph, walLogHandler: ExceptionHandler) = { router = Akka.system.actorOf(props(s2, walLogHandler)) } @@ -56,10 +56,10 @@ object QueueActor { Thread.sleep(Config.ASYNC_HBASE_CLIENT_FLUSH_INTERVAL * 2) } - def props(s2: Graph, walLogHandler: ExceptionHandler): Props = Props(classOf[QueueActor], s2, walLogHandler) + def props(s2: S2Graph, walLogHandler: ExceptionHandler): Props = Props(classOf[QueueActor], s2, walLogHandler) } -class QueueActor(s2: Graph, walLogHandler: ExceptionHandler) extends Actor with ActorLogging { +class QueueActor(s2: S2Graph, walLogHandler: ExceptionHandler) extends Actor with ActorLogging { import Protocol._ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala index b1635fb..aed8ced 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala @@ -38,7 +38,7 @@ object EdgeController extends Controller { import ApplicationController._ import play.api.libs.concurrent.Execution.Implicits._ - private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph + private val s2: S2Graph = org.apache.s2graph.rest.play.Global.s2graph private val requestParser: RequestParser = org.apache.s2graph.rest.play.Global.s2parser private val walLogHandler: ExceptionHandler = org.apache.s2graph.rest.play.Global.wallLogHandler @@ -51,9 +51,9 @@ object EdgeController extends Controller { val kafkaTopic = toKafkaTopic(graphElem.isAsync) graphElem match { - case v: Vertex => + case v: S2Vertex => enqueue(kafkaTopic, graphElem, tsv) - case e: Edge => + case e: S2Edge => e.innerLabel.extraOptions.get("walLog") match { case None => enqueue(kafkaTopic, e, tsv) @@ -73,7 +73,7 @@ object EdgeController extends Controller { } } - private def toDeleteAllFailMessages(srcVertices: Seq[Vertex], labels: Seq[Label], dir: Int, ts: Long ) = { + private def toDeleteAllFailMessages(srcVertices: Seq[S2Vertex], labels: Seq[Label], dir: Int, ts: Long ) = { for { vertex <- srcVertices id = vertex.id.toString @@ -92,7 +92,7 @@ object EdgeController extends Controller { val result = s2.mutateElements(elements.map(_._1), true) result onComplete { results => results.get.zip(elements).map { - case (false, (e: Edge, tsv: String)) => + case (false, (e: S2Edge, tsv: String)) => val kafkaMessages = if(e.op == GraphUtil.operations("deleteAll")){ toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.innerLabel), e.labelWithDir.dir, e.ts) } else{ @@ -267,7 +267,7 @@ object EdgeController extends Controller { } def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue], - ts: Long, vertices: Seq[Vertex]) = { + ts: Long, vertices: Seq[S2Vertex]) = { val future = s2.deleteAllAdjacentEdges(vertices.toList, labels, GraphUtil.directions(direction), ts) if (withWait) { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala index 72e6e82..43f0b15 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala @@ -21,7 +21,7 @@ package org.apache.s2graph.rest.play.controllers import org.apache.s2graph.core.rest.RequestParser import org.apache.s2graph.core.utils.logger -import org.apache.s2graph.core.{ExceptionHandler, Graph, GraphExceptions} +import org.apache.s2graph.core.{ExceptionHandler, S2Graph, GraphExceptions} import org.apache.s2graph.rest.play.actors.QueueActor import org.apache.s2graph.rest.play.config.Config import play.api.libs.json.{JsValue, Json} @@ -30,7 +30,7 @@ import play.api.mvc.{Controller, Result} import scala.concurrent.Future object VertexController extends Controller { - private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph + private val s2: S2Graph = org.apache.s2graph.rest.play.Global.s2graph private val requestParser: RequestParser = org.apache.s2graph.rest.play.Global.s2parser private val walLogHandler: ExceptionHandler = org.apache.s2graph.rest.play.Global.wallLogHandler
