bug fix on Edge.vertices.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/350e2e6a Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/350e2e6a Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/350e2e6a Branch: refs/heads/master Commit: 350e2e6a2ade36ee11a29a0bbb1bacf2b2d7e345 Parents: 4425859 Author: DO YUNG YOON <[email protected]> Authored: Mon Apr 10 23:04:10 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Mon Apr 10 23:05:32 2017 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/core/QueryResult.scala | 44 +++- .../scala/org/apache/s2graph/core/S2Edge.scala | 31 ++- .../scala/org/apache/s2graph/core/S2Graph.scala | 43 ++-- .../org/apache/s2graph/core/S2Vertex.scala | 3 +- .../s2graph/core/mysqls/ServiceColumn.scala | 3 +- .../apache/s2graph/core/storage/Storage.scala | 3 +- .../core/storage/hbase/AsynchbaseStorage.scala | 31 ++- .../serde/vertex/VertexDeserializable.scala | 5 +- .../serde/vertex/VertexSerializable.scala | 4 +- .../s2graph/core/utils/SafeUpdateCache.scala | 4 +- .../core/tinkerpop/S2GraphProvider.scala | 19 +- .../core/tinkerpop/structure/S2GraphTest.scala | 254 ++++++++++++------- 12 files changed, 281 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala index bad8361..a7f485c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala @@ -27,24 +27,42 @@ import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.collection.{Seq, mutable} object QueryResult { + def fromVertices(graph: S2Graph, vertices: Seq[S2Vertex], queryParams: Seq[QueryParam]): StepResult = { + val edgeWithScores = vertices.flatMap { vertex => + queryParams.map { queryParam => + val label = queryParam.label + val currentTs = System.currentTimeMillis() + val propsWithTs = Map(LabelMeta.timestamp -> + InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs)) + + val edge = graph.newEdge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs) + val edgeWithScore = EdgeWithScore(edge, S2Graph.DefaultScore, queryParam.label) + edgeWithScore + + } + } + StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil, false) + } + def fromVertices(graph: S2Graph, query: Query): StepResult = { if (query.steps.isEmpty || query.steps.head.queryParams.isEmpty) { StepResult.Empty } else { - val queryParam = query.steps.head.queryParams.head - val label = queryParam.label - val currentTs = System.currentTimeMillis() - val propsWithTs = Map(LabelMeta.timestamp -> - InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs)) - val edgeWithScores = for { - vertex <- query.vertices - } yield { - val edge = graph.newEdge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs) - val edgeWithScore = EdgeWithScore(edge, S2Graph.DefaultScore, queryParam.label) - edgeWithScore - } - StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil, false) + fromVertices(graph, query.vertices, query.steps.head.queryParams) +// val queryParam = query.steps.head.queryParams.head +// val label = queryParam.label +// val currentTs = System.currentTimeMillis() +// val propsWithTs = Map(LabelMeta.timestamp -> +// InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs)) +// val edgeWithScores = for { +// vertex <- query.vertices +// } yield { +// val edge = graph.newEdge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs) +// val edgeWithScore = EdgeWithScore(edge, S2Graph.DefaultScore, queryParam.label) +// edgeWithScore +// } +// StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil, false) } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala index 162ada5..6321dd5 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala @@ -596,18 +596,26 @@ case class S2Edge(innerGraph: S2Graph, override def vertices(direction: Direction): util.Iterator[structure.Vertex] = { val arr = new util.ArrayList[Vertex]() + direction match { case Direction.OUT => - val newVertexId = VertexId(ServiceColumn.findById(srcForVertex.id.colId), srcForVertex.innerId) - arr.add(srcVertex.copy(id = newVertexId)) -// arr.add(srcVertex) + val newVertexId = this.direction match { + case "out" => VertexId(innerLabel.srcColumn, srcVertex.innerId) + case "in" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId) + case _ => throw new IllegalArgumentException("direction can only be out/in.") + } + innerGraph.getVertex(newVertexId).foreach(arr.add) case Direction.IN => - val newVertexId = VertexId(ServiceColumn.findById(tgtForVertex.id.colId), tgtForVertex.innerId) - arr.add(tgtVertex.copy(id = newVertexId)) -// arr.add(tgtVertex) + val newVertexId = this.direction match { + case "in" => VertexId(innerLabel.srcColumn, srcVertex.innerId) + case "out" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId) + case _ => throw new IllegalArgumentException("direction can only be out/in.") + } + innerGraph.getVertex(newVertexId).foreach(arr.add) case _ => - arr.add(srcVertex) - arr.add(tgtVertex) + import scala.collection.JavaConversions._ + vertices(Direction.OUT).foreach(arr.add) + vertices(Direction.IN).foreach(arr.add) } arr.iterator() } @@ -674,11 +682,8 @@ case class S2Edge(innerGraph: S2Graph, override def id(): AnyRef = { // NOTE: xxxForVertex makes direction to be "out" - if (this.innerLabel.consistencyLevel == "strong") { - EdgeId(srcForVertex.innerId, tgtForVertex.innerId, label(), direction, 0) - } else { - EdgeId(srcForVertex.innerId, tgtForVertex.innerId, label(), direction, ts) - } + val timestamp = if (this.innerLabel.consistencyLevel == "string") 0l else ts + EdgeId(srcVertex.innerId, tgtVertex.innerId, label(), direction, timestamp) } override def label(): String = innerLabel.label http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index 2771415..7e037b8 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -70,7 +70,7 @@ object S2Graph { "cache.ttl.seconds" -> java.lang.Integer.valueOf(60), "hbase.client.retries.number" -> java.lang.Integer.valueOf(20), "hbase.rpcs.buffered_flush_interval" -> java.lang.Short.valueOf(100.toShort), - "hbase.rpc.timeout" -> java.lang.Integer.valueOf(1000), + "hbase.rpc.timeout" -> java.lang.Integer.valueOf(60000), "max.retry.number" -> java.lang.Integer.valueOf(100), "lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10), "max.back.off" -> java.lang.Integer.valueOf(100), @@ -530,8 +530,9 @@ object S2Graph { @Graph.OptIn(Graph.OptIn.SUITE_STRUCTURE_STANDARD) @Graph.OptOuts(value = Array( - // new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.EdgeTest", method="*", reason="no"), // pass - // new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.GraphConstructionTest", method="*", reason="no"), // pass +// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.EdgeTest", method="*", reason="no"), // pass +// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.GraphConstructionTest", method="*", reason="no"), // pass + new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.FeatureSupportTest", method="*", reason="no"), // pass new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.PropertyTest", method="*", reason="no"), // pass @@ -1370,22 +1371,22 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph S2Edge.fillPropsWithTs(snapshotEdge, propsWithTs) snapshotEdge } - - /** - * internal helper to actually store a single edge based on given peramters. - * - * Note that this is used from S2Vertex to implement blocking interface from Tp3. - * Once tp3 provide AsyncStep, then this can be changed to return Java's CompletableFuture. - * - * @param srcVertex - * @param tgtVertex - * @param labelName - * @param direction - * @param props - * @param ts - * @param operation - * @return - */ +// +// /** +// * internal helper to actually store a single edge based on given peramters. +// * +// * Note that this is used from S2Vertex to implement blocking interface from Tp3. +// * Once tp3 provide AsyncStep, then this can be changed to return Java's CompletableFuture. +// * +// * @param srcVertex +// * @param tgtVertex +// * @param labelName +// * @param direction +// * @param props +// * @param ts +// * @param operation +// * @return +// */ // private[core] def addEdgeInner(srcVertex: S2Vertex, // tgtVertex: S2Vertex, // labelName: String, @@ -1466,6 +1467,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph val queryParams = labelNameWithDirs.map { case (l, direction) => QueryParam(labelName = l, direction = direction) } + val query = Query.toQuery(Seq(vertex), queryParams) getEdges(query).map { stepResult => @@ -1529,8 +1531,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph override def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = { if (edgeIds.isEmpty) { // FIXME - val edges = Await.result(defaultStorage.fetchEdgesAll(), WaitTimeout).iterator - edges.filterNot(_.isDegree).filterNot(_.direction == "in") + Await.result(defaultStorage.fetchEdgesAll(), WaitTimeout).iterator } else { Await.result(edgesAsync(edgeIds: _*), WaitTimeout) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala index e13f581..b80a54c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala @@ -209,7 +209,8 @@ case class S2Vertex(graph: S2Graph, val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) val edge = graph.newEdge(this, otherV, label, dir, op = op, version = ts, propsWithTs = propsWithTs) - val future = graph.mutateEdges(edge.relatedEdges, withWait = true) + // edge.relatedEdges + val future = graph.mutateEdges(Seq(edge), withWait = true) Await.ready(future, graph.WaitTimeout) edge } catch { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala index 8614132..32ca653 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala @@ -25,7 +25,8 @@ package org.apache.s2graph.core.mysqls import org.apache.s2graph.core.JSONParser import org.apache.s2graph.core.JSONParser._ -import org.apache.s2graph.core.types.{HBaseType, InnerValLikeWithTs, InnerValLike} +import org.apache.s2graph.core.types.{HBaseType, InnerValLike, InnerValLikeWithTs} +import org.apache.s2graph.core.utils.logger import play.api.libs.json.Json import scalikejdbc._ object ServiceColumn extends Model[ServiceColumn] { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index a9b523c..a8dec7e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -285,6 +285,7 @@ abstract class Storage[Q, R](val graph: S2Graph, val queryParam = QueryParam.Empty val q = Query.toQuery(Seq(vertex), Seq(queryParam)) val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam) + fetchVertexKeyValues(queryRequest).map { kvs => fromResult(kvs, vertex.serviceColumn.schemaVersion) } recoverWith { case ex: Throwable => @@ -321,7 +322,7 @@ abstract class Storage[Q, R](val graph: S2Graph, mutateRet <- Future.sequence(mutateEdgeFutures) } yield mutateRet - composed.map(_.forall(identity)).map { ret => idxs.map(idx => idx -> ret) } + composed.map(_.forall(identity)).map { ret => idxs.map( idx => idx -> ret) } } Future.sequence(mutateEdges).map { squashedRets => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index dab5aae..e41fe27 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -23,7 +23,7 @@ package org.apache.s2graph.core.storage.hbase import java.util import java.util.Base64 -import java.util.concurrent.{TimeUnit, ExecutorService, Executors} +import java.util.concurrent.{ExecutorService, Executors, TimeUnit} import com.stumbleupon.async.{Callback, Deferred} import com.typesafe.config.Config @@ -34,17 +34,17 @@ import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding import org.apache.hadoop.hbase.regionserver.BloomType import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.{TableName, HColumnDescriptor, HBaseConfiguration, HTableDescriptor} +import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} import org.apache.hadoop.security.UserGroupInformation - import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn} import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, ScanWithRange} -import org.apache.s2graph.core.types.{VertexId, HBaseType} +import org.apache.s2graph.core.types.{HBaseType, VertexId} import org.apache.s2graph.core.utils._ import org.hbase.async.FilterList.Operator.MUST_PASS_ALL import org.hbase.async._ + import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.concurrent._ @@ -96,8 +96,8 @@ object AsynchbaseStorage { def initLocalHBase(config: Config, overwrite: Boolean = true): ExecutorService = { - import java.net.Socket import java.io.{File, IOException} + import java.net.Socket lazy val hbaseExecutor = { val executor = Executors.newSingleThreadExecutor() @@ -277,6 +277,7 @@ class AsynchbaseStorage(override val graph: S2Graph, override def fetchSnapshotEdgeKeyValues(queryRequest: QueryRequest): Future[Seq[SKeyValue]] = { val edge = toRequestEdge(queryRequest, Nil) val rpc = buildRequest(queryRequest, edge) + fetchKeyValues(rpc) } @@ -445,6 +446,7 @@ class AsynchbaseStorage(override val graph: S2Graph, val edge = toRequestEdge(queryRequest, parentEdges) val request = buildRequest(queryRequest, edge) + val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge)) val requestCacheKey = Bytes.add(toCacheKeyBytes(request), intervalMaxBytes, intervalMinBytes) @@ -655,10 +657,8 @@ class AsynchbaseStorage(override val graph: S2Graph, override def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = { def fromResult(kvs: Seq[SKeyValue], version: String): Option[S2Vertex] = { - if (kvs.isEmpty) None else vertexDeserializer.fromKeyValues(kvs, None) -// .map(S2Vertex(graph, _)) } val futures = vertices.map { vertex => @@ -677,20 +677,25 @@ class AsynchbaseStorage(override val graph: S2Graph, Future.sequence(futures).map { result => result.toList.flatten } } + //TODO: Limited to 100000 edges per hbase table. fix this later. override def fetchEdgesAll(): Future[Seq[S2Edge]] = { val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) => + val distinctLabels = labels.toSet val scan = AsynchbasePatcher.newScanner(client, hTableName) scan.setFamily(Serializable.edgeCf) scan.setMaxVersions(1) - scan.nextRows(10000).toFuture(emptyKeyValuesLs).map { + scan.nextRows(100000).toFuture(emptyKeyValuesLs).map { case null => Seq.empty case kvsLs => - kvsLs.flatMap { kvs => - kvs.flatMap { kv => - indexEdgeDeserializer.fromKeyValues(Seq(kv), None) + kvsLs.flatMap { kvs => + kvs.flatMap { kv => + val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv) + + indexEdgeDeserializer.fromKeyValues(Seq(kv), None) + .filter(e => distinctLabels(e.innerLabel) && e.direction == "out" && !e.isDegree) + } } - } } } @@ -699,6 +704,7 @@ class AsynchbaseStorage(override val graph: S2Graph, override def fetchVerticesAll(): Future[Seq[S2Vertex]] = { val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) => + val distinctColumns = columns.toSet val scan = AsynchbasePatcher.newScanner(client, hTableName) scan.setFamily(Serializable.vertexCf) scan.setMaxVersions(1) @@ -708,6 +714,7 @@ class AsynchbaseStorage(override val graph: S2Graph, case kvsLs => kvsLs.flatMap { kvs => vertexDeserializer.fromKeyValues(kvs, None) + .filter(v => distinctColumns(v.serviceColumn)) } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala index b4a00e6..f8921a8 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala @@ -23,7 +23,8 @@ import org.apache.s2graph.core.mysqls.{ColumnMeta, Label} import org.apache.s2graph.core.storage.StorageDeserializable._ import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable} import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLike, VertexId} -import org.apache.s2graph.core.{S2Graph, QueryParam, S2Vertex} +import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core.{QueryParam, S2Graph, S2Vertex} import scala.collection.mutable.ListBuffer @@ -33,7 +34,6 @@ class VertexDeserializable(graph: S2Graph, cacheElementOpt: Option[S2Vertex]): Option[S2Vertex] = { try { val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - val kv = kvs.head val version = HBaseType.DEFAULT_VERSION val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version) @@ -64,6 +64,7 @@ class VertexDeserializable(graph: S2Graph, assert(maxTs != Long.MinValue) val vertex = graph.newVertex(vertexId, maxTs, S2Vertex.EmptyProps, belongLabelIds = belongLabelIds) S2Vertex.fillPropsWithTs(vertex, propsMap.toMap) + Option(vertex) } catch { case e: Exception => None http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala index 1dbcd00..ee147f1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala @@ -22,6 +22,8 @@ package org.apache.s2graph.core.storage.serde.vertex import org.apache.s2graph.core.S2Vertex import org.apache.s2graph.core.storage.StorageSerializable._ import org.apache.s2graph.core.storage.{SKeyValue, Serializable} +import org.apache.s2graph.core.utils.logger + import scala.collection.JavaConverters._ case class VertexSerializable(vertex: S2Vertex, intToBytes: Int => Array[Byte] = intToBytes) extends Serializable[S2Vertex] { @@ -45,6 +47,6 @@ case class VertexSerializable(vertex: S2Vertex, intToBytes: Int => Array[Byte] = val belongsTo = vertex.belongLabelIds.map { labelId => intToBytes(S2Vertex.toPropKey(labelId)) -> Array.empty[Byte] } (base ++ belongsTo).map { case (qualifier, value) => SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts) - } toSeq + }.toSeq } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala index c54dcde..a98104c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala @@ -20,10 +20,12 @@ package org.apache.s2graph.core.utils import java.util.concurrent.atomic.AtomicBoolean + import com.google.common.cache.CacheBuilder + +import scala.collection.JavaConversions._ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} -import scala.collection.JavaConversions._ object SafeUpdateCache { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala index 18bf998..865717d 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala @@ -34,6 +34,7 @@ class S2GraphProvider extends AbstractGraphProvider { val config = ConfigFactory.load() val m = new java.util.HashMap[String, AnyRef]() m.put(Graph.GRAPH, classOf[S2Graph].getName) +// m.put("db.default.url", "jdbc:h2:mem:db1;MODE=MYSQL") m } @@ -64,8 +65,6 @@ class S2GraphProvider extends AbstractGraphProvider { } private def cleanupSchema(graph: Graph): Unit = { -// new File("./var/metastore").delete() - val s2Graph = graph.asInstanceOf[S2Graph] val mnt = s2Graph.getManagement() val defaultService = s2Graph.DefaultService @@ -139,12 +138,15 @@ class S2GraphProvider extends AbstractGraphProvider { ColumnMeta.findOrInsert(defaultServiceColumn.id.get, "aKey", dataType, useCache = false) } - // knows props -// mnt.createLabel("knows", defaultService.serviceName, "vertex", "string", defaultService.serviceName, "vertex", "string", -// true, defaultService.serviceName, Nil, knowsProp, "strong", None, None, options = Option("""{"skipReverse": false}""")) + if (testClass.getSimpleName == "DetachedEdgeTest") { + mnt.createLabel("knows", defaultService.serviceName, "person", "integer", defaultService.serviceName, "person", "integer", + true, defaultService.serviceName, Nil, knowsProp, "strong", None, None, options = Option("""{"skipReverse": false}""")) + } else { + mnt.createLabel("knows", defaultService.serviceName, "vertex", "string", defaultService.serviceName, "vertex", "string", + true, defaultService.serviceName, Nil, knowsProp, "strong", None, None, options = Option("""{"skipReverse": false}""")) + } + - mnt.createLabel("knows", defaultService.serviceName, "vertex", "string", defaultService.serviceName, "vertex", "string", - true, defaultService.serviceName, Nil, knowsProp, "strong", None, None, options = Option("""{"skipReverse": false}""")) // if (testClass.getSimpleName.contains("VertexTest") || (testClass.getSimpleName == "EdgeTest" && testName == "shouldAutotypeDoubleProperties")) { // mnt.createLabel("knows", defaultService.serviceName, "vertex", "string", defaultService.serviceName, "vertex", "string", // true, defaultService.serviceName, Nil, knowsProp, "strong", None, None, options = Option("""{"skipReverse": false}""")) @@ -153,7 +155,8 @@ class S2GraphProvider extends AbstractGraphProvider { // true, defaultService.serviceName, Nil, knowsProp, "strong", None, None, options = Option("""{"skipReverse": false}""")) // } - val personColumn = Management.createServiceColumn(defaultService.serviceName, "person", "integer", Seq(Prop(T.id.toString, "-1", "integer"), Prop("name", "-", "string"), Prop("age", "0", "integer"), Prop("location", "-", "string"))) + val personColumn = Management.createServiceColumn(defaultService.serviceName, "person", "integer", + Seq(Prop(T.id.toString, "-1", "integer"), Prop("name", "-", "string"), Prop("age", "0", "integer"), Prop("location", "-", "string"))) val softwareColumn = Management.createServiceColumn(defaultService.serviceName, "software", "integer", Seq(Prop(T.id.toString, "-1", "integer"), Prop("name", "-", "string"), Prop("lang", "-", "string"))) val productColumn = Management.createServiceColumn(defaultService.serviceName, "product", "integer", Nil) val dogColumn = Management.createServiceColumn(defaultService.serviceName, "dog", "integer", Nil) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/350e2e6a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala index 46f58a8..5454e24 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala @@ -19,21 +19,18 @@ package org.apache.s2graph.core.tinkerpop.structure -import java.io.File - import org.apache.s2graph.core.Management.JsonModel.Prop -import org.apache.s2graph.core.mysqls.Label +import org.apache.s2graph.core._ import org.apache.s2graph.core.utils.logger -import org.apache.s2graph.core.{Management, S2Graph, S2Vertex, TestCommonWithModels} -import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal -import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, T, Vertex} -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource +import org.apache.tinkerpop.gremlin.structure._ +import org.apache.tinkerpop.gremlin.structure.util.Attachable +import org.apache.tinkerpop.gremlin.structure.util.detached.{DetachedEdge, DetachedFactory} import org.scalatest.{FunSuite, Matchers} class S2GraphTest extends FunSuite with Matchers with TestCommonWithModels { - import scala.collection.JavaConversions._ import scala.concurrent.ExecutionContext.Implicits.global initTests() @@ -187,7 +184,147 @@ class S2GraphTest extends FunSuite with Matchers with TestCommonWithModels { // test("addVertex with empty parameter") { // // } - test("aaa") { +// test("aaa") { +// val mnt = graph.management +// val defaultService = graph.DefaultService +// val defaultServiceColumn = graph.DefaultColumn +// val columnNames = Set(defaultServiceColumn.columnName, "person", "software", "product", "dog") +// val labelNames = Set("knows", "created", "bought", "test", "self", "friends", "friend", "hate", "collaborator", "test1", "test2", "test3", "pets", "walks") +// +// Management.deleteService(defaultService.serviceName) +// columnNames.foreach { columnName => +// Management.deleteColumn(defaultServiceColumn.service.serviceName, columnName) +// } +// labelNames.foreach { labelName => +// Management.deleteLabel(labelName) +// } +// +// val knows = mnt.createLabel("knows", +// defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, +// defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, +// true, defaultService.serviceName, Nil, Seq(Prop("since", "0", "integer")), consistencyLevel = "strong", None, None, +// options = Option("""{"skipReverse": false}""")) +// +// val pets = mnt.createLabel("pets", +// defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, +// defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, +// true, defaultService.serviceName, Nil, Nil, consistencyLevel = "strong", None, None, +// options = Option("""{"skipReverse": false}""")) +// +// val walks = mnt.createLabel("walks", +// defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, +// defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, +// true, defaultService.serviceName, Nil, Seq(Prop("location", "-", "string")), consistencyLevel = "strong", None, None, +// options = Option("""{"skipReverse": false}""")) +// +// val livesWith = mnt.createLabel("livesWith", +// defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, +// defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, +// true, defaultService.serviceName, Nil, Nil, consistencyLevel = "strong", None, None, +// options = Option("""{"skipReverse": false}""")) +// +// val friend = mnt.createLabel("friend", defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, +// true, defaultService.serviceName, Nil, +// Seq( +// Prop("name", "-", "string"), +// Prop("location", "-", "string"), +// Prop("status", "-", "string") +// ), +// "strong", None, None, +// options = Option("""{"skipReverse": false}""") +// ) +// +// val v1 = graph.addVertex("name", "marko") +// val v2 = graph.addVertex("name", "puppy") +// +// v1.addEdge("knows", v2, "since", Int.box(2010)) +// v1.addEdge("pets", v2) +// v1.addEdge("walks", v2, "location", "arroyo") +// v2.addEdge("knows", v1, "since", Int.box(2010)) +// +// v1.edges(Direction.BOTH).foreach { e => logger.error(s"[Edge]: $e")} +// } + +// test("bb") { +// val mnt = graph.management +// val defaultService = graph.DefaultService +// val defaultServiceColumn = graph.DefaultColumn +// val columnNames = Set(defaultServiceColumn.columnName, "person", "software", "product", "dog") +// val labelNames = Set("knows", "created", "bought", "test", "self", "friends", "friend", "hate", "collaborator", "test1", "test2", "test3", "pets", "walks") +// +// Management.deleteService(defaultService.serviceName) +// columnNames.foreach { columnName => +// Management.deleteColumn(defaultServiceColumn.service.serviceName, columnName) +// } +// labelNames.foreach { labelName => +// Management.deleteLabel(labelName) +// } +// val personColumn = Management.createServiceColumn(defaultService.serviceName, "person", "integer", +// Seq(Prop(T.id.toString, "-1", "integer"), Prop("name", "-", "string"), Prop("age", "0", "integer"), Prop("location", "-", "string"))) +// val knows = mnt.createLabel("knows", +// defaultService.serviceName, "person", "integer", +// defaultService.serviceName, "person", "integer", +// true, defaultService.serviceName, Nil, Seq(Prop("since", "0", "integer"), Prop("year", "0", "integer")), consistencyLevel = "strong", None, None) +// +// val created = mnt.createLabel("created", defaultService.serviceName, "person", "integer", defaultService.serviceName, "software", "integer", +// true, defaultService.serviceName, Nil, Seq(Prop("weight", "0.0", "double")), "strong", None, None) +// +//// val v1 = graph.toVertex(graph.DefaultService.serviceName, "person", 1) +//// val v4 = graph.toVertex(graph.DefaultService.serviceName, "person", 4) +//// val ts = System.currentTimeMillis() +//// val edge = graph.newEdge(v1, v4, knows.get, +//// GraphUtil.directions("out"), GraphUtil.operations("insert"), +//// propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, knows.get.schemaVersion))) +// val v1 = graph.addVertex(T.label, "person", T.id, Int.box(1), "name", "marko") +// val v4 = graph.addVertex(T.label, "person", T.id, Int.box(4), "name", "vadas") +// +// val g = graph.traversal() +// v1.addEdge("knows", v4, "year", Int.box(2002)) +// +// def convertToEdgeId(outVertexName: String, edgeLabel: String, inVertexName: String): AnyRef = { +// g.V().has("name", outVertexName).outE(edgeLabel).as("e").inV.has("name", inVertexName).select[Edge]("e").next().id() +// } +// +// g.V().has("name", "marko").outE("knows").as("e").inV.foreach(e => logger.error(s"[Edge]: $e")) +// +//// .as("e").inV.has("name", "vadas").select[Edge]("e").next().id() +//// g.E(convertToEdgeId("marko", "knows", "vadas")).foreach(e => logger.error(s"[EDGE]: $e")) +//// val x = DetachedFactory.detach(g.E(convertToEdgeId("marko", "knows", "vadas")).next(), true) +////// .hashCode() +//// val y = DetachedFactory.detach(g.E(convertToEdgeId("marko", "knows", "vadas")).next(), true) +//// .hashCode() +//// logger.error(s"[X]: $x") +//// logger.error(s"[Y]: $y") +// +//// g.E().foreach(e => logger.error(s"[Edge]: $e")) +//// g.V().has("name", "marko").outE("knows").foreach(v => logger.error(s"[OutVertex]: $v")) +//// g.V().has("name", "vadas").inE("knows").foreach(v => logger.error(s"[InVertex]: $v")) +// +// +// @Test +// @LoadGraphWith(GraphData.MODERN) +// @FeatureRequirementSet(FeatureRequirementSet.Package.SIMPLE) +// def shouldConstructDetachedEdgeAsReference() { +// +// graph.traversal().E(convertToEdgeId("marko", "knows", "vadas")).next().property("year", 2002); +// val detachedEdge = DetachedFactory.detach(g.E(convertToEdgeId("marko", "knows", "vadas")).next(), false); +//// assertEquals(convertToEdgeId("marko", "knows", "vadas"), detachedEdge.id()); +//// assertEquals("knows", detachedEdge.label()); +//// assertEquals(DetachedVertex.class, detachedEdge.vertices(Direction.OUT).next().getClass()); +//// assertEquals(convertToVertexId("marko"), detachedEdge.vertices(Direction.OUT).next().id()); +//// assertEquals("person", detachedEdge.vertices(Direction.IN).next().label()); +//// assertEquals(DetachedVertex.class, detachedEdge.vertices(Direction.IN).next().getClass()); +//// assertEquals(convertToVertexId("vadas"), detachedEdge.vertices(Direction.IN).next().id()); +//// assertEquals("person", detachedEdge.vertices(Direction.IN).next().label()); +//// +//// assertEquals(0, IteratorUtils.count(detachedEdge.properties())); +// } +//// shouldConstructDetachedEdgeAsReference() +// } + def convertToEdgeId(g: GraphTraversalSource, outVertexName: String, edgeLabel: String, inVertexName: String): AnyRef = { + g.V().has("name", outVertexName).outE(edgeLabel).as("e").inV.has("name", inVertexName).select[Edge]("e").next().id() + } + test("ccc") { val mnt = graph.management val defaultService = graph.DefaultService val defaultServiceColumn = graph.DefaultColumn @@ -201,90 +338,29 @@ class S2GraphTest extends FunSuite with Matchers with TestCommonWithModels { labelNames.foreach { labelName => Management.deleteLabel(labelName) } - + val personColumn = Management.createServiceColumn(defaultService.serviceName, "person", "integer", + Seq(Prop(T.id.toString, "-1", "integer"), Prop("name", "-", "string"), Prop("age", "0", "integer"), Prop("location", "-", "string"))) val knows = mnt.createLabel("knows", - defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, - defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, - true, defaultService.serviceName, Nil, Seq(Prop("since", "0", "integer")), consistencyLevel = "strong", None, None, - options = Option("""{"skipReverse": false}""")) - - val pets = mnt.createLabel("pets", - defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, - defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, - true, defaultService.serviceName, Nil, Nil, consistencyLevel = "strong", None, None, - options = Option("""{"skipReverse": false}""")) - - val walks = mnt.createLabel("walks", - defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, - defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, - true, defaultService.serviceName, Nil, Seq(Prop("location", "-", "string")), consistencyLevel = "strong", None, None, - options = Option("""{"skipReverse": false}""")) - - val livesWith = mnt.createLabel("livesWith", - defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, - defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, - true, defaultService.serviceName, Nil, Nil, consistencyLevel = "strong", None, None, - options = Option("""{"skipReverse": false}""")) - - val friend = mnt.createLabel("friend", defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, defaultService.serviceName, defaultServiceColumn.columnName, defaultServiceColumn.columnType, - true, defaultService.serviceName, Nil, - Seq( - Prop("name", "-", "string"), - Prop("location", "-", "string"), - Prop("status", "-", "string") - ), - "strong", None, None, - options = Option("""{"skipReverse": false}""") - ) -// (0 until 2).foreach(i => graph.addVertex("myId", Int.box(i))) -// -// graph.vertices().foreach(v => -// graph.vertices().foreach(u => v.addEdge("knows", u, "myEdgeId", Int.box(12))) -// ) -// -// val v = graph.vertices().toSeq.head -// v.remove() -// -// graph.edges().foreach(e => -// logger.error(s"[Edge]: $e") -// ) + defaultService.serviceName, "person", "integer", + defaultService.serviceName, "person", "integer", + true, defaultService.serviceName, Nil, Seq(Prop("since", "0", "integer"), Prop("year", "0", "integer")), consistencyLevel = "strong", None, None) + val created = mnt.createLabel("created", + defaultService.serviceName, "person", "integer", + defaultService.serviceName, "person", "integer", + true, defaultService.serviceName, Nil, Seq(Prop("weight", "0.0", "double")), "strong", None, None) -// val v1 = graph.addVertex(T.id, "v1", "name", "marko") -// val v2 = graph.addVertex(T.id, "101", "name", "puppy") -// v1.addEdge("knows", v2, "since", Int.box(2010)) -// v1.addEdge("pets", v2) -// v1.addEdge("walks", v2, "location", "arroyo") -// v2.addEdge("knows", v1, "since", Int.box(2010)) -// -// v1.edges(Direction.BOTH).foreach(edge => { -// v1.addEdge("livesWith", v2) -// v1.addEdge("walks", v2, "location", "river") -// edge.remove() -// }) -// -// val edges = v1.edges(Direction.BOTH) -// edges.foreach { e => -// logger.error(s"[Before]: $e") -// e.remove() -// } -// -// v1.edges(Direction.OUT).foreach { e => -// logger.error(s"[V1.Edge]: $e") -// } -// v2.edges(Direction.BOTH).foreach { e => -// logger.error(s"[V2.Edge]: $e") -// } - (0 until 25).foreach { i => - val v = graph.addVertex() - v.addEdge("friend", v) - } - graph.vertices().foreach(v => logger.error(s"[Vertex]: $v")) - graph.edges().foreach(e => logger.error(s"[Edge]: $e")) + val g = graph.traversal() + val v1 = graph.addVertex(T.label, "person", T.id, Int.box(1), "name", "josh") + val v4 = graph.addVertex(T.label, "person", T.id, Int.box(4), "name", "lop") + val e = v1.addEdge("created", v4) - graph.edges().foreach(e => e.remove) + val toDetach = g.E(convertToEdgeId(g, "josh", "created", "lop")).next() + val outV = toDetach.vertices(Direction.OUT).next() + val detachedEdge = DetachedFactory.detach(toDetach, true) + val attached = detachedEdge.attach(Attachable.Method.get(outV)) - graph.edges().foreach(e => logger.error(s"[Edge]: $e")) + assert(toDetach.equals(attached)) + assert(!attached.isInstanceOf[DetachedEdge]) } - } \ No newline at end of file
