remove newVertex/newEdge/newVertexId on S2GraphLike.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/eabe7570 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/eabe7570 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/eabe7570 Branch: refs/heads/master Commit: eabe7570a9ef9ec0cb7b36d518d5bef4d1fd53dd Parents: 6403bc9 Author: DO YUNG YOON <[email protected]> Authored: Fri Nov 10 22:57:51 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Sat Nov 11 10:40:19 2017 +0900 ---------------------------------------------------------------------- .../loader/subscriber/GraphSubscriber.scala | 67 +---- .../loader/subscriber/TransferToHFile.scala | 4 +- .../org/apache/s2graph/core/QueryResult.scala | 2 +- .../scala/org/apache/s2graph/core/S2Edge.scala | 8 +- .../org/apache/s2graph/core/S2EdgeBuilder.scala | 2 +- .../scala/org/apache/s2graph/core/S2Graph.scala | 256 ++----------------- .../org/apache/s2graph/core/S2GraphLike.scala | 45 +--- .../org/apache/s2graph/core/S2GraphTp.scala | 98 +++++++ .../org/apache/s2graph/core/S2VertexLike.scala | 3 +- .../apache/s2graph/core/TraversalHelper.scala | 153 ++++++++++- .../s2graph/core/rest/RequestParser.scala | 2 +- .../tall/IndexEdgeDeserializable.scala | 15 +- .../wide/IndexEdgeDeserializable.scala | 15 +- .../tall/SnapshotEdgeDeserializable.scala | 12 +- .../wide/SnapshotEdgeDeserializable.scala | 13 +- .../vertex/tall/VertexDeserializable.scala | 3 +- .../vertex/wide/VertexDeserializable.scala | 3 +- .../org/apache/s2graph/core/S2EdgeTest.scala | 24 +- .../s2graph/core/TestCommonWithModels.scala | 3 +- .../s2graph/core/parsers/WhereParserTest.scala | 22 +- .../s2graph/core/storage/StorageIOTest.scala | 4 +- .../core/storage/hbase/IndexEdgeTest.scala | 6 +- .../process/S2GraphProcessStandardTest.scala | 4 +- .../S2GraphStructureIntegrateTest.scala | 4 +- .../S2GraphStructureStandardTest.scala | 4 +- 25 files changed, 354 insertions(+), 418 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala index a371b6b..6ecb070 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala @@ -67,6 +67,7 @@ object GraphSubscriberHelper extends WithKafka { var g: S2Graph = null var management: Management = null val conns = new scala.collection.mutable.HashMap[String, Connection]() + var builder: GraphElementBuilder = null def toOption(s: String) = { s match { @@ -82,6 +83,7 @@ object GraphSubscriberHelper extends WithKafka { val ec = ExecutionContext.Implicits.global g = new S2Graph(config)(ec) management = new Management(g) + builder = g.elementBuilder } } @@ -106,7 +108,7 @@ object GraphSubscriberHelper extends WithKafka { (statFunc: (String, Int) => Unit): Iterable[GraphElement] = { (for (msg <- msgs) yield { statFunc("total", 1) - g.elementBuilder.toGraphElement(msg, labelMapping) match { + builder.toGraphElement(msg, labelMapping) match { case Some(e) if e.isInstanceOf[S2Edge] => statFunc("EdgeParseOk", 1) e.asInstanceOf[S2Edge] @@ -122,69 +124,6 @@ object GraphSubscriberHelper extends WithKafka { }).toList } -// private def storeRec(zkQuorum: String, tableName: String, puts: List[Put], elementsSize: Int, tryNum: Int) -// (statFunc: (String, Int) => Unit, statPrefix: String = "edge"): Unit = { -// if (tryNum <= 0) { -// statFunc("errorStore", elementsSize) -// throw new RuntimeException(s"retry failed after $maxTryNum") -// } -// val conn = getConn(zkQuorum) -// val mutator = conn.getBufferedMutator(TableName.valueOf(tableName)) -// // val table = conn.getTable(TableName.valueOf(tableName)) -// // table.setAutoFlush(false, false) -// -// try { -// puts.foreach { put => put.setDurability(Durability.ASYNC_WAL) } -// mutator.mutate(puts) -// // table.put(puts) -// statFunc(s"$statPrefix:storeOk", elementsSize) -// } catch { -// case e: Throwable => -// e.printStackTrace() -// Thread.sleep(sleepPeriod) -// storeRec(zkQuorum, tableName, puts, elementsSize, tryNum - 1)(statFunc) -// } finally { -// mutator.close() -// // table.close() -// } -// } -// -// def storeDegreeBulk(zkQuorum: String, tableName: String) -// (degrees: Iterable[(String, String, String, Int)], labelMapping: Map[String, String] = Map.empty) -// (mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, Long)] = { -// val counts = HashMap[String, Long]() -// val statFunc = storeStat(counts)(mapAccOpt) _ -// -// for { -// (vertexId, labelName, direction, degreeVal) <- degrees -// incrementRequests <- TransferToHFile.buildDegreePutRequests(vertexId, labelName, direction, degreeVal) -// } { -// storeRec(zkQuorum, tableName, incrementRequests, degrees.size, maxTryNum)(statFunc, "degree") -// } -// counts -// } -// def storeBulk(zkQuorum: String, tableName: String) -// (msgs: Seq[String], labelMapping: Map[String, String] = Map.empty, autoCreateEdge: Boolean = false) -// (mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, Long)] = { -// -// val counts = HashMap[String, Long]() -// val statFunc = storeStat(counts)(mapAccOpt) _ -// val elements = toGraphElements(msgs, labelMapping)(statFunc) -// -// val puts = elements.flatMap { element => -// element match { -// case v: Vertex if v.op == GraphUtil.operations("insert") || v.op == GraphUtil.operations("insertBulk") => -// v.buildPuts() -// case e: Edge if e.op == GraphUtil.operations("insert") || e.op == GraphUtil.operations("insertBulk") => -// EdgeWriter(e).insertBulkForLoader(autoCreateEdge) -// case _ => Nil -// } -// } toList -// -// storeRec(zkQuorum, tableName, puts, msgs.size, maxTryNum)(statFunc) -// counts -// } - def storeStat(counts: HashMap[String, Long])(mapAccOpt: Option[HashMapAccumulable])(key: String, value: Int) = { counts.put(key, counts.getOrElse(key, 0L) + value) mapAccOpt match { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala index a7b4e00..6aaf6fd 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala @@ -97,11 +97,11 @@ object TransferToHFile extends SparkApp { val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse { throw new RuntimeException(s"$vertexId can not be converted into innerval") } - val vertex = GraphSubscriberHelper.g.newVertex(SourceVertexId(label.srcColumn, innerVal)) + val vertex = GraphSubscriberHelper.builder.newVertex(SourceVertexId(label.srcColumn, innerVal)) val ts = System.currentTimeMillis() val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion)) - val edge = GraphSubscriberHelper.g.newEdge(vertex, vertex, label, dir, propsWithTs=propsWithTs) + val edge = GraphSubscriberHelper.builder.newEdge(vertex, vertex, label, dir, propsWithTs=propsWithTs) edge.edgesWithIndex.flatMap { indexEdge => GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.map { kv => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala index 7d187c6..37ddf06 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala @@ -35,7 +35,7 @@ object QueryResult { val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs)) - val edge = graph.newEdge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs) + val edge = graph.elementBuilder.newEdge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs) val edgeWithScore = EdgeWithScore(edge, S2Graph.DefaultScore, queryParam.label) edgeWithScore http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala index 3e33ed4..3eb1fa7 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala @@ -817,10 +817,10 @@ object S2Edge { val belongLabelIds = Seq(e.getLabelId()) if (e.getDir() == GraphUtil.directions("in")) { val tgtColumn = getServiceColumn(e.tgtVertex, e.innerLabel.tgtColumn) - e.innerGraph.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds) + e.innerGraph.elementBuilder.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds) } else { val srcColumn = getServiceColumn(e.srcVertex, e.innerLabel.srcColumn) - e.innerGraph.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds) + e.innerGraph.elementBuilder.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds) } } @@ -828,10 +828,10 @@ object S2Edge { val belongLabelIds = Seq(e.getLabelId()) if (e.getDir() == GraphUtil.directions("in")) { val srcColumn = getServiceColumn(e.srcVertex, e.innerLabel.srcColumn) - e.innerGraph.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds) + e.innerGraph.elementBuilder.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds) } else { val tgtColumn = getServiceColumn(e.tgtVertex, e.innerLabel.tgtColumn) - e.innerGraph.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds) + e.innerGraph.elementBuilder.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala index 4004a13..dc0baa2 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala @@ -80,7 +80,7 @@ class S2EdgeBuilder(edge: S2EdgeLike) { def updateTgtVertex(id: InnerValLike): S2EdgeLike = { val newId = TargetVertexId(edge.tgtVertex.id.column, id) - val newTgtVertex = edge.innerGraph.newVertex(newId, edge.tgtVertex.ts, edge.tgtVertex.props) + val newTgtVertex = edge.innerGraph.elementBuilder.newVertex(newId, edge.tgtVertex.ts, edge.tgtVertex.props) copyEdge(tgtVertex = newTgtVertex) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index f80d5c2..3905442 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -21,7 +21,7 @@ package org.apache.s2graph.core import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} -import java.util.concurrent.{Executors, TimeUnit} +import java.util.concurrent.{ExecutorService, Executors, TimeUnit} import com.typesafe.config.{Config, ConfigFactory} import org.apache.commons.configuration.{BaseConfiguration, Configuration} @@ -96,6 +96,8 @@ object S2Graph { val DefaultColumnName = "vertex" val DefaultLabelName = "_s2graph" + var hbaseExecutor: ExecutorService = _ + val graphStrategies: TraversalStrategies = TraversalStrategies.GlobalCache.getStrategies(classOf[Graph]).addStrategies(S2GraphStepStrategy.instance) @@ -130,7 +132,14 @@ object S2Graph { logger.info(s"[InitStorage]: $storageBackend") storageBackend match { - case "hbase" => new AsynchbaseStorage(graph, config) + case "hbase" => + hbaseExecutor = + if (config.getString("hbase.zookeeper.quorum") == "localhost") + AsynchbaseStorage.initLocalHBase(config) + else + null + + new AsynchbaseStorage(graph, config) case _ => throw new RuntimeException("not supported storage.") } } @@ -150,94 +159,6 @@ object S2Graph { } } - [email protected](value = Array( - new Graph.OptIn(value = Graph.OptIn.SUITE_PROCESS_STANDARD), - new Graph.OptIn(value = Graph.OptIn.SUITE_STRUCTURE_STANDARD) -)) [email protected](value = Array( - /* Process */ - /* branch: passed all. */ - /* filter */ - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.DropTest$Traversals", method = "g_V_properties_drop", reason = "please find bug on this case."), - - /* map */ - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_both_both_count", reason = "count differ very little. fix this."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_repeatXoutX_timesX3X_count", reason = "count differ very little. fix this."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_repeatXoutX_timesX8X_count", reason = "count differ very litter. fix this."), - - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "testProfileStrategyCallback", reason = "NullPointerException. fix this."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "g_V_whereXinXcreatedX_count_isX1XX_name_profile", reason = "java.lang.AssertionError: There should be 3 top-level metrics. fix this."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "g_V_whereXinXcreatedX_count_isX1XX_name_profileXmetricsX", reason = "expected 2, actual 6. fix this."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "grateful_V_out_out_profileXmetricsX", reason = "expected 8049, actual 8046. fix this."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "grateful_V_out_out_profile", reason = "expected 8049, actual 8046. fix this."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "modern_V_out_out_profileXmetricsX", reason = "expected 2, actual 6. fix this."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "modern_V_out_out_profile", reason = "expected 2, actual 6. fix this."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "testProfileStrategyCallbackSideEffect", reason = "NullPointerException. fix this."), - - /* sideEffect */ - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest$Traversals", method = "g_V_withSideEffectXsgX_repeatXbothEXcreatedX_subgraphXsgX_outVX_timesX5X_name_dedup", reason = "Expected 5, Actual 6."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest$Traversals", method = "g_V_withSideEffectXsgX_outEXknowsX_subgraphXsgX_name_capXsgX", reason = "Expected 3, Actual 6"), - - /* compliance */ - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.CoreTraversalTest", method = "shouldThrowExceptionWhenIdsMixed", reason = "VertexId is not Element."), - - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest", method = "*", reason = "not supported yet."), - - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithSpecifiedId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithGeneratedCustomId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithGeneratedDefaultId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithGeneratedDefaultId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithGeneratedCustomId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithSpecifiedId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."), - - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategyProcessTest", method = "*", reason = "not supported yet."), - - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.PartitionStrategyProcessTest", method = "*", reason = "not supported yet."), - - /* Structure */ - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method = "shouldValidateIdEquality", reason = "reference equals on EdgeId is not supported."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method = "shouldValidateEquality", reason = "reference equals on EdgeId is not supported."), - - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.VertexTest$BasicVertexTest", method = "shouldHaveExceptionConsistencyWhenAssigningSameIdOnEdge", reason = "S2Vertex.addEdge behave as upsert."), - - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdgeTest", method = "shouldNotEvaluateToEqualDifferentId", reason = "reference equals is not supported."), - - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.detached.DetachedPropertyTest", method = "shouldNotBeEqualPropertiesAsThereIsDifferentKey", reason = "reference equals is not supported."), - - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldRemoveVertices", reason = "random label creation is not supported. all label need to be pre-configured."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldHaveExceptionConsistencyWhenAssigningSameIdOnVertex", reason = "Assigning the same ID to an Element update instead of throwing exception."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldRemoveEdges", reason = "random label creation is not supported. all label need to be pre-configured."), - - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdgeTest", method = "shouldNotEvaluateToEqualDifferentId", reason = "Assigning the same ID to an Element update instead of throwing exception."), - - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.1)", reason = "graphson-v2-embedded is not supported."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.5)", reason = "graphson-v2-embedded is not supported."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.5)", reason = "graphson-v2-embedded is not supported."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.1)", reason = "graphson-v2-embedded is not supported."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(PowerLawDistribution{gamma=2.3, multiplier=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.25)", reason = "graphson-v2-embedded is not supported."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(PowerLawDistribution{gamma=2.3, multiplier=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.25)", reason = "graphson-v2-embedded is not supported."), - - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.DistributionGeneratorTest", method = "*", reason = "non-deterministic test."), - - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.SerializationTest$GryoTest", method = "shouldSerializeTree", reason = "order of children is reversed. not sure why."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.SerializationTest$GraphSONTest", method = "shouldSerializeTraversalMetrics", reason = "expected 2, actual 3."), - - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithBOTHEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithINEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteDetachedVertexAsReferenceNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithOUTEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteDetachedVertexNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), - - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteDetachedEdgeAsReference", specific = "graphson-v2-embedded", reason = "no"), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteEdge", specific = "graphson-v2-embedded", reason = "no"), - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteDetachedEdge", specific = "graphson-v2-embedded", reason = "no"), - - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoGraphTest", method = "*", reason = "no"), // all failed. - - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoTest", method = "*", reason = "no") -)) class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2GraphLike { var apacheConfiguration: Configuration = _ @@ -457,10 +378,10 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap } else { val filterOutFuture = q.queryOption.filterOutQuery match { case None => fallback - case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery) + case Some(filterOutQuery) => traversalHelper.getEdgesStepInner(filterOutQuery) } for { - stepResult <- getEdgesStepInner(q) + stepResult <- traversalHelper.getEdgesStepInner(q) filterOutInnerResult <- filterOutFuture } yield { if (filterOutInnerResult.isEmpty) stepResult @@ -480,7 +401,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap else { val filterOutFuture = mq.queryOption.filterOutQuery match { case None => fallback - case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery) + case Some(filterOutQuery) => traversalHelper.getEdgesStepInner(filterOutQuery) } val multiQueryFutures = Future.sequence(mq.queries.map { query => getEdges(query) }) @@ -515,7 +436,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap } val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) { - fetchAndDeleteAll(queries, requestTs) + traversalHelper.fetchAndDeleteAll(queries, requestTs) } { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess } @@ -546,12 +467,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap } override def getVertex(vertexId: VertexId): Option[S2VertexLike] = { - val v = newVertex(vertexId) + val v = elementBuilder.newVertex(vertexId) Await.result(getVertices(Seq(v)).map { vertices => vertices.headOption }, WaitTimeout) } override def fetchEdges(vertex: S2VertexLike, labelNameWithDirs: Seq[(String, String)]): util.Iterator[Edge] = { - Await.result(fetchEdgesAsync(vertex, labelNameWithDirs), WaitTimeout) + Await.result(traversalHelper.fetchEdgesAsync(vertex, labelNameWithDirs), WaitTimeout) } override def edgesAsync(vertex: S2VertexLike, direction: Direction, labelNames: String*): Future[util.Iterator[Edge]] = { @@ -574,149 +495,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap } } - fetchEdgesAsync(vertex, labelNameWithDirs.distinct) + traversalHelper.fetchEdgesAsync(vertex, labelNameWithDirs.distinct) } def isRunning(): Boolean = running.get() - - /** Private **/ - - private def getEdgesStepInner(q: Query, buildLastStepInnerResult: Boolean = false): Future[StepResult] = { - Try { - if (q.steps.isEmpty) fallback - else { - def fetch: Future[StepResult] = { - val startStepInnerResult = QueryResult.fromVertices(this, q) - q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) => - for { - prevStepInnerResult <- prevStepInnerResultFuture - currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult, buildLastStepInnerResult) - } yield { - currentStepInnerResult.copy( - accumulatedCursors = prevStepInnerResult.accumulatedCursors :+ currentStepInnerResult.cursors, - failCount = currentStepInnerResult.failCount + prevStepInnerResult.failCount - ) - } - } - } - - fetch - } - } recover { - case e: Exception => - logger.error(s"getEdgesAsync: $e", e) - fallback - } get - } - - private def fetchStep(orgQuery: Query, - stepIdx: Int, - stepInnerResult: StepResult, - buildLastStepInnerResult: Boolean = false): Future[StepResult] = { - if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty) - else { - val (_, prevStepTgtVertexIdEdges: Map[VertexId, ArrayBuffer[EdgeWithScore]], queryRequests: Seq[QueryRequest]) = - traversalHelper.buildNextStepQueryRequests(orgQuery, stepIdx, stepInnerResult) - - val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges) - - traversalHelper.filterEdges(orgQuery, stepIdx, queryRequests, - fetchedLs, orgQuery.steps(stepIdx).queryParams, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec) - } - } - - private def fetches(queryRequests: Seq[QueryRequest], - prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = { - - val reqWithIdxs = queryRequests.zipWithIndex - val requestsPerLabel = reqWithIdxs.groupBy(t => t._1.queryParam.label) - val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) => - for { - prev <- prevFuture - cur <- getStorage(label).fetches(reqWithIdxs.map(_._1), prevStepEdges) - } yield { - prev ++ reqWithIdxs.map(_._2).zip(cur).toMap - } - } - aggFuture.map { agg => agg.toSeq.sortBy(_._1).map(_._2) } - } - - private def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = { - val futures = queries.map(getEdgesStepInner(_, true)) - val future = for { - stepInnerResultLs <- Future.sequence(futures) - (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs) - } yield { - // logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}") - (allDeleted, ret) - } - - Extensions.retryOnFailure(MaxRetryNum) { - future - } { - logger.error(s"fetch and deleteAll failed.") - (true, false) - } - - } - - private def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult], - requestTs: Long): Future[(Boolean, Boolean)] = { - stepInnerResultLs.foreach { stepInnerResult => - if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.") - } - val futures = for { - stepInnerResult <- stepInnerResultLs - filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore => - (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree - } - edgesToDelete = elementBuilder.buildEdgesToDelete(filtered, requestTs) - if edgesToDelete.nonEmpty - } yield { - val head = edgesToDelete.head - val label = head.edge.innerLabel - val stepResult = StepResult(edgesToDelete, Nil, Nil, false) - val ret = label.schemaVersion match { - case HBaseType.VERSION3 | HBaseType.VERSION4 => - if (label.consistencyLevel == "strong") { - /* - * read: snapshotEdge on queryResult = O(N) - * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge)) - */ - mutateEdges(edgesToDelete.map(_.edge), withWait = true).map(_.forall(_.isSuccess)) - } else { - getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum) - } - case _ => - - /* - * read: x - * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices) - */ - getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum) - } - ret - } - - if (futures.isEmpty) { - // all deleted. - Future.successful(true -> true) - } else { - Future.sequence(futures).map { rets => false -> rets.forall(identity) } - } - } - - private def fetchEdgesAsync(vertex: S2VertexLike, labelNameWithDirs: Seq[(String, String)]): Future[util.Iterator[Edge]] = { - val queryParams = labelNameWithDirs.map { case (l, direction) => - QueryParam(labelName = l, direction = direction.toLowerCase) - } - - val query = Query.toQuery(Seq(vertex), queryParams) - val queryRequests = queryParams.map { param => QueryRequest(query, 0, vertex, param) } - val ls = new util.ArrayList[Edge]() - fetches(queryRequests, Map.empty).map { stepResultLs => - stepResultLs.foreach(_.edgeWithScores.foreach(es => ls.add(es.edge))) - ls.iterator() - } - } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala index a58f1e0..1b80cb4 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala @@ -58,8 +58,6 @@ trait S2GraphLike extends Graph { override def features() = s2Features - def nextLocalLongId = localLongId.getAndIncrement() - def fallback = Future.successful(StepResult.Empty) def defaultStorage: Storage @@ -103,34 +101,6 @@ trait S2GraphLike extends Graph { def edgesAsync(vertex: S2VertexLike, direction: Direction, labelNames: String*): Future[util.Iterator[Edge]] /** Convert to Graph Element **/ - def newEdge(srcVertex: S2VertexLike, - tgtVertex: S2VertexLike, - innerLabel: Label, - dir: Int, - op: Byte = GraphUtil.defaultOpByte, - version: Long = System.currentTimeMillis(), - propsWithTs: S2Edge.State, - parentEdges: Seq[EdgeWithScore] = Nil, - originalEdgeOpt: Option[S2EdgeLike] = None, - pendingEdgeOpt: Option[S2EdgeLike] = None, - statusCode: Byte = 0, - lockTs: Option[Long] = None, - tsInnerValOpt: Option[InnerValLike] = None): S2EdgeLike = - elementBuilder.newEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, propsWithTs, - parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) - - def newVertexId(service: Service, - column: ServiceColumn, - id: Any): VertexId = - elementBuilder.newVertexId(service, column, id) - - def newVertex(id: VertexId, - ts: Long = System.currentTimeMillis(), - props: S2Vertex.Props = S2Vertex.EmptyProps, - op: Byte = 0, - belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = - elementBuilder.newVertex(id, ts, props, op, belongLabelIds) - def toVertex(serviceName: String, columnName: String, id: Any, @@ -261,7 +231,7 @@ trait S2GraphLike extends Graph { val vertex = kvsMap.get(T.id.name()) match { case None => // do nothing - val id = nextLocalLongId + val id = localLongId.getAndIncrement() makeVertex(Long.box(id), kvsMap) case Some(idValue) if S2Property.validType(idValue) => makeVertex(idValue, kvsMap) @@ -269,7 +239,7 @@ trait S2GraphLike extends Graph { throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported } - addVertexInner(vertex) + addVertex(vertex.id, vertex.ts, vertex.props, vertex.op, vertex.belongLabelIds) vertex } @@ -290,17 +260,6 @@ trait S2GraphLike extends Graph { vertex } - def addVertexInner(vertex: S2VertexLike): S2VertexLike = { - val future = mutateVertices(Seq(vertex), withWait = true).flatMap { rets => - if (rets.forall(_.isSuccess)) { - indexProvider.mutateVerticesAsync(Seq(vertex)) - } else throw new RuntimeException("addVertex failed.") - } - Await.ready(future, WaitTimeout) - - vertex - } - /* tp3 only */ def addEdge(srcVertex: S2VertexLike, labelName: String, tgtVertex: Vertex, kvs: AnyRef*): Edge = { val containsId = kvs.contains(T.id) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/S2GraphTp.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphTp.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphTp.scala new file mode 100644 index 0000000..5ce4086 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphTp.scala @@ -0,0 +1,98 @@ +package org.apache.s2graph.core + +import com.typesafe.config.Config +import org.apache.tinkerpop.gremlin.structure.Graph + +import scala.concurrent.ExecutionContext + + [email protected](value = Array( + new Graph.OptIn(value = Graph.OptIn.SUITE_PROCESS_STANDARD), + new Graph.OptIn(value = Graph.OptIn.SUITE_STRUCTURE_STANDARD) +)) [email protected](value = Array( + /* Process */ + /* branch: passed all. */ + /* filter */ + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.DropTest$Traversals", method = "g_V_properties_drop", reason = "please find bug on this case."), + + /* map */ + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_both_both_count", reason = "count differ very little. fix this."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_repeatXoutX_timesX3X_count", reason = "count differ very little. fix this."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_repeatXoutX_timesX8X_count", reason = "count differ very litter. fix this."), + + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "testProfileStrategyCallback", reason = "NullPointerException. fix this."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "g_V_whereXinXcreatedX_count_isX1XX_name_profile", reason = "java.lang.AssertionError: There should be 3 top-level metrics. fix this."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "g_V_whereXinXcreatedX_count_isX1XX_name_profileXmetricsX", reason = "expected 2, actual 6. fix this."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "grateful_V_out_out_profileXmetricsX", reason = "expected 8049, actual 8046. fix this."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "grateful_V_out_out_profile", reason = "expected 8049, actual 8046. fix this."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "modern_V_out_out_profileXmetricsX", reason = "expected 2, actual 6. fix this."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "modern_V_out_out_profile", reason = "expected 2, actual 6. fix this."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "testProfileStrategyCallbackSideEffect", reason = "NullPointerException. fix this."), + + /* sideEffect */ + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest$Traversals", method = "g_V_withSideEffectXsgX_repeatXbothEXcreatedX_subgraphXsgX_outVX_timesX5X_name_dedup", reason = "Expected 5, Actual 6."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest$Traversals", method = "g_V_withSideEffectXsgX_outEXknowsX_subgraphXsgX_name_capXsgX", reason = "Expected 3, Actual 6"), + + /* compliance */ + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.CoreTraversalTest", method = "shouldThrowExceptionWhenIdsMixed", reason = "VertexId is not Element."), + + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest", method = "*", reason = "not supported yet."), + + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithSpecifiedId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithGeneratedCustomId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithGeneratedDefaultId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithGeneratedDefaultId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithGeneratedCustomId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithSpecifiedId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."), + + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategyProcessTest", method = "*", reason = "not supported yet."), + + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.PartitionStrategyProcessTest", method = "*", reason = "not supported yet."), + + /* Structure */ + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method = "shouldValidateIdEquality", reason = "reference equals on EdgeId is not supported."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method = "shouldValidateEquality", reason = "reference equals on EdgeId is not supported."), + + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.VertexTest$BasicVertexTest", method = "shouldHaveExceptionConsistencyWhenAssigningSameIdOnEdge", reason = "S2Vertex.addEdge behave as upsert."), + + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdgeTest", method = "shouldNotEvaluateToEqualDifferentId", reason = "reference equals is not supported."), + + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.detached.DetachedPropertyTest", method = "shouldNotBeEqualPropertiesAsThereIsDifferentKey", reason = "reference equals is not supported."), + + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldRemoveVertices", reason = "random label creation is not supported. all label need to be pre-configured."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldHaveExceptionConsistencyWhenAssigningSameIdOnVertex", reason = "Assigning the same ID to an Element update instead of throwing exception."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldRemoveEdges", reason = "random label creation is not supported. all label need to be pre-configured."), + + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdgeTest", method = "shouldNotEvaluateToEqualDifferentId", reason = "Assigning the same ID to an Element update instead of throwing exception."), + + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.1)", reason = "graphson-v2-embedded is not supported."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.5)", reason = "graphson-v2-embedded is not supported."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.5)", reason = "graphson-v2-embedded is not supported."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.1)", reason = "graphson-v2-embedded is not supported."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(PowerLawDistribution{gamma=2.3, multiplier=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.25)", reason = "graphson-v2-embedded is not supported."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(PowerLawDistribution{gamma=2.3, multiplier=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.25)", reason = "graphson-v2-embedded is not supported."), + + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.DistributionGeneratorTest", method = "*", reason = "non-deterministic test."), + + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.SerializationTest$GryoTest", method = "shouldSerializeTree", reason = "order of children is reversed. not sure why."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.SerializationTest$GraphSONTest", method = "shouldSerializeTraversalMetrics", reason = "expected 2, actual 3."), + + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithBOTHEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithINEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteDetachedVertexAsReferenceNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithOUTEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteDetachedVertexNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), + + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteDetachedEdgeAsReference", specific = "graphson-v2-embedded", reason = "no"), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteEdge", specific = "graphson-v2-embedded", reason = "no"), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteDetachedEdge", specific = "graphson-v2-embedded", reason = "no"), + + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoGraphTest", method = "*", reason = "no"), // all failed. + + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoTest", method = "*", reason = "no") +)) +class S2GraphTp(config: Config)(override implicit val ec: ExecutionContext) extends S2Graph(config) { + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala index ad35efd..97e3095 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala @@ -118,7 +118,8 @@ trait S2VertexLike extends Vertex with GraphElement { props.put(key, newProps) // FIXME: save to persistent for tp test - graph.addVertexInner(this) +// graph.addVertexInner(this) + graph.addVertex(id, ts, props, op, belongLabelIds) newProps case _ => throw new RuntimeException("only single cardinality is supported.") } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala index 25b909e..7a8e63e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala @@ -1,14 +1,17 @@ package org.apache.s2graph.core +import java.util + import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey} import org.apache.s2graph.core.mysqls.LabelMeta -import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection, VertexId} -import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core.types.{HBaseType, InnerVal, LabelWithDirection, VertexId} +import org.apache.s2graph.core.utils.{Extensions, logger} +import org.apache.tinkerpop.gremlin.structure.Edge import scala.annotation.tailrec import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.concurrent.Future -import scala.util.Random +import scala.util.{Random, Try} object TraversalHelper { @tailrec @@ -125,6 +128,150 @@ object TraversalHelper { class TraversalHelper(graph: S2GraphLike) { import TraversalHelper._ + implicit val ec = graph.ec + val MaxRetryNum = graph.config.getInt("max.retry.number") + + def fallback = Future.successful(StepResult.Empty) + + def getEdgesStepInner(q: Query, buildLastStepInnerResult: Boolean = false): Future[StepResult] = { + Try { + if (q.steps.isEmpty) fallback + else { + def fetch: Future[StepResult] = { + val startStepInnerResult = QueryResult.fromVertices(graph, q) + q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) => + for { + prevStepInnerResult <- prevStepInnerResultFuture + currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult, buildLastStepInnerResult) + } yield { + currentStepInnerResult.copy( + accumulatedCursors = prevStepInnerResult.accumulatedCursors :+ currentStepInnerResult.cursors, + failCount = currentStepInnerResult.failCount + prevStepInnerResult.failCount + ) + } + } + } + + fetch + } + } recover { + case e: Exception => + logger.error(s"getEdgesAsync: $e", e) + fallback + } get + } + + def fetchStep(orgQuery: Query, + stepIdx: Int, + stepInnerResult: StepResult, + buildLastStepInnerResult: Boolean = false): Future[StepResult] = { + if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty) + else { + val (_, prevStepTgtVertexIdEdges: Map[VertexId, ArrayBuffer[EdgeWithScore]], queryRequests: Seq[QueryRequest]) = + graph.traversalHelper.buildNextStepQueryRequests(orgQuery, stepIdx, stepInnerResult) + + val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges) + + graph.traversalHelper.filterEdges(orgQuery, stepIdx, queryRequests, + fetchedLs, orgQuery.steps(stepIdx).queryParams, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(graph.ec) + } + } + + def fetches(queryRequests: Seq[QueryRequest], + prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = { + + val reqWithIdxs = queryRequests.zipWithIndex + val requestsPerLabel = reqWithIdxs.groupBy(t => t._1.queryParam.label) + val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) => + for { + prev <- prevFuture + cur <- graph.getStorage(label).fetches(reqWithIdxs.map(_._1), prevStepEdges) + } yield { + prev ++ reqWithIdxs.map(_._2).zip(cur).toMap + } + } + aggFuture.map { agg => agg.toSeq.sortBy(_._1).map(_._2) } + } + + def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = { + val futures = queries.map(getEdgesStepInner(_, true)) + val future = for { + stepInnerResultLs <- Future.sequence(futures) + (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs) + } yield { + // logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}") + (allDeleted, ret) + } + + Extensions.retryOnFailure(MaxRetryNum) { + future + } { + logger.error(s"fetch and deleteAll failed.") + (true, false) + } + + } + + def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult], + requestTs: Long): Future[(Boolean, Boolean)] = { + stepInnerResultLs.foreach { stepInnerResult => + if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.") + } + val futures = for { + stepInnerResult <- stepInnerResultLs + filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore => + (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree + } + edgesToDelete = graph.elementBuilder.buildEdgesToDelete(filtered, requestTs) + if edgesToDelete.nonEmpty + } yield { + val head = edgesToDelete.head + val label = head.edge.innerLabel + val stepResult = StepResult(edgesToDelete, Nil, Nil, false) + val ret = label.schemaVersion match { + case HBaseType.VERSION3 | HBaseType.VERSION4 => + if (label.consistencyLevel == "strong") { + /* + * read: snapshotEdge on queryResult = O(N) + * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge)) + */ + graph.mutateEdges(edgesToDelete.map(_.edge), withWait = true).map(_.forall(_.isSuccess)) + } else { + graph.getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum) + } + case _ => + + /* + * read: x + * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices) + */ + graph.getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum) + } + ret + } + + if (futures.isEmpty) { + // all deleted. + Future.successful(true -> true) + } else { + Future.sequence(futures).map { rets => false -> rets.forall(identity) } + } + } + + def fetchEdgesAsync(vertex: S2VertexLike, labelNameWithDirs: Seq[(String, String)]): Future[util.Iterator[Edge]] = { + val queryParams = labelNameWithDirs.map { case (l, direction) => + QueryParam(labelName = l, direction = direction.toLowerCase) + } + + val query = Query.toQuery(Seq(vertex), queryParams) + val queryRequests = queryParams.map { param => QueryRequest(query, 0, vertex, param) } + val ls = new util.ArrayList[Edge]() + fetches(queryRequests, Map.empty).map { stepResultLs => + stepResultLs.foreach(_.edgeWithScores.foreach(es => ls.add(es.edge))) + ls.iterator() + } + } + def buildNextStepQueryRequests(orgQuery: Query, stepIdx: Int, stepInnerResult: StepResult) = { val edgeWithScoreLs = stepInnerResult.edgeWithScores http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala index 54b865a..55a3638 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala @@ -269,7 +269,7 @@ class RequestParser(graph: S2GraphLike) { id <- ids innerId <- jsValueToInnerVal(id, serviceColumn.columnType, label.schemaVersion) } yield { - graph.newVertex(SourceVertexId(serviceColumn, innerId), System.currentTimeMillis()) + graph.elementBuilder.newVertex(SourceVertexId(serviceColumn, innerId), System.currentTimeMillis()) } vertices http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala index 6ebc90c..945f246 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala @@ -36,6 +36,7 @@ class IndexEdgeDeserializable(graph: S2GraphLike, type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int) type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int) + val builder = graph.elementBuilder override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T], cacheElementOpt: Option[S2EdgeLike]): Option[S2EdgeLike] = { @@ -61,7 +62,7 @@ class IndexEdgeDeserializable(graph: S2GraphLike, else { val label = Label.findById(labelWithDir.labelId) val schemaVer = label.schemaVersion - val srcVertex = graph.newVertex(srcVertexId, version) + val srcVertex = builder.newVertex(srcVertexId, version) var tsVal = version val isTallSchema = tallSchemaVersions(label.schemaVersion) val isDegree = if (isTallSchema) pos == kv.row.length else kv.qualifier.isEmpty @@ -71,13 +72,13 @@ class IndexEdgeDeserializable(graph: S2GraphLike, // val degreeVal = Bytes.toLong(kv.value) val degreeVal = bytesToLongFunc(kv.value, 0) val tgtVertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("0", schemaVer)) - val tgtVertex = graph.newVertex(tgtVertexId, version) - val edge = graph.newEdge(srcVertex, tgtVertex, + val tgtVertex = builder.newVertex(tgtVertexId, version) + val edge = graph.elementBuilder.newEdge(srcVertex, tgtVertex, label, labelWithDir.dir, GraphUtil.defaultOpByte, version, S2Edge.EmptyState) edge.propertyInner(LabelMeta.timestamp.name, version, version) edge.propertyInner(LabelMeta.degree.name, degreeVal, version) - edge.tgtVertex = graph.newVertex(tgtVertexId, version) + edge.tgtVertex = builder.newVertex(tgtVertexId, version) edge.setOp(GraphUtil.defaultOpByte) edge.setTsInnerValOpt(Option(InnerVal.withLong(tsVal, schemaVer))) @@ -113,8 +114,8 @@ class IndexEdgeDeserializable(graph: S2GraphLike, else kv.qualifier(kv.qualifier.length - 1) } - val tgtVertex = graph.newVertex(tgtVertexIdRaw, version) - val edge = graph.newEdge(srcVertex, tgtVertex, + val tgtVertex = builder.newVertex(tgtVertexIdRaw, version) + val edge = graph.elementBuilder.newEdge(srcVertex, tgtVertex, label, labelWithDir.dir, GraphUtil.defaultOpByte, version, S2Edge.EmptyState) val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}")) @@ -150,7 +151,7 @@ class IndexEdgeDeserializable(graph: S2GraphLike, if (edge.checkProperty(LabelMeta.to.name)) { val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs - val tgtVertex = graph.newVertex(TargetVertexId(ServiceColumn.Default, vId.innerVal), version) + val tgtVertex = builder.newVertex(TargetVertexId(ServiceColumn.Default, vId.innerVal), version) edge.setTgtVertex(tgtVertex) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala index 79e3f2e..e533b4b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -32,6 +32,7 @@ class IndexEdgeDeserializable(graph: S2GraphLike, type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int) type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int) + val builder = graph.elementBuilder override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T], cacheElementOpt: Option[S2EdgeLike]): Option[S2EdgeLike] = { @@ -55,20 +56,20 @@ class IndexEdgeDeserializable(graph: S2GraphLike, else { val label = Label.findById(labelWithDir.labelId) val schemaVer = label.schemaVersion - val srcVertex = graph.newVertex(srcVertexId, version) + val srcVertex = builder.newVertex(srcVertexId, version) //TODO: var tsVal = version if (kv.qualifier.isEmpty) { val degreeVal = bytesToLongFunc(kv.value, 0) val tgtVertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("0", schemaVer)) - val tgtVertex = graph.newVertex(tgtVertexId, version) - val edge = graph.newEdge(srcVertex, tgtVertex, + val tgtVertex = builder.newVertex(tgtVertexId, version) + val edge = builder.newEdge(srcVertex, tgtVertex, label, labelWithDir.dir, GraphUtil.defaultOpByte, version, S2Edge.EmptyState) edge.propertyInner(LabelMeta.timestamp.name, version, version) edge.propertyInner(LabelMeta.degree.name, degreeVal, version) - edge.tgtVertex = graph.newVertex(tgtVertexId, version) + edge.tgtVertex = builder.newVertex(tgtVertexId, version) edge.setOp(GraphUtil.defaultOpByte) edge.setTsInnerValOpt(Option(InnerVal.withLong(tsVal, schemaVer))) @@ -88,8 +89,8 @@ class IndexEdgeDeserializable(graph: S2GraphLike, if (kv.qualifier.length == pos) GraphUtil.defaultOpByte else kv.qualifier(kv.qualifier.length-1) - val tgtVertex = graph.newVertex(tgtVertexIdRaw, version) - val edge = graph.newEdge(srcVertex, tgtVertex, + val tgtVertex = builder.newVertex(tgtVertexIdRaw, version) + val edge = builder.newEdge(srcVertex, tgtVertex, label, labelWithDir.dir, GraphUtil.defaultOpByte, version, S2Edge.EmptyState) val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}")) @@ -124,7 +125,7 @@ class IndexEdgeDeserializable(graph: S2GraphLike, /* process tgtVertexId */ if (edge.checkProperty(LabelMeta.to.name)) { val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs - val tgtVertex = graph.newVertex(TargetVertexId(ServiceColumn.Default, vId.innerVal), version) + val tgtVertex = builder.newVertex(TargetVertexId(ServiceColumn.Default, vId.innerVal), version) edge.setTgtVertex(tgtVertex) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala index 4c97f6e..b7f5ba1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala @@ -28,7 +28,7 @@ import org.apache.s2graph.core._ import org.apache.s2graph.core.storage.serde.Deserializable class SnapshotEdgeDeserializable(graph: S2GraphLike) extends Deserializable[SnapshotEdge] { - + val builder = graph.elementBuilder def statusCodeWithOp(byte: Byte): (Byte, Byte) = { val statusCode = byte >> 4 val op = byte & ((1 << 4) - 1) @@ -89,15 +89,19 @@ class SnapshotEdgeDeserializable(graph: S2GraphLike) extends Deserializable[Snap val lockTs = Option(Bytes.toLong(kv.value, pos, 8)) val pendingEdge = - graph.newEdge(graph.newVertex(srcVertexId, version), - graph.newVertex(tgtVertexId, version), + builder.newEdge( + builder.newVertex(srcVertexId, version), + builder.newVertex(tgtVertexId, version), label, labelWithDir.dir, pendingEdgeOp, version, pendingEdgeProps.toMap, statusCode = pendingEdgeStatusCode, lockTs = lockTs, tsInnerValOpt = Option(tsInnerVal)) + Option(pendingEdge) } - val snapshotEdge = graph.elementBuilder.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts), + val snapshotEdge = builder.newSnapshotEdge( + builder.newVertex(srcVertexId, ts), + builder.newVertex(tgtVertexId, ts), label, labelWithDir.dir, op, version, props.toMap, statusCode = statusCode, pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala index a6a64a2..7dec6d9 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala @@ -28,7 +28,7 @@ import org.apache.s2graph.core._ import org.apache.s2graph.core.storage.serde.Deserializable class SnapshotEdgeDeserializable(graph: S2GraphLike) extends Deserializable[SnapshotEdge] { - + val builder = graph.elementBuilder def statusCodeWithOp(byte: Byte): (Byte, Byte) = { val statusCode = byte >> 4 val op = byte & ((1 << 4) - 1) @@ -55,7 +55,7 @@ class SnapshotEdgeDeserializable(graph: S2GraphLike) extends Deserializable[Snap else { val label = Label.findById(labelWithDir.labelId) val schemaVer = label.schemaVersion - val srcVertex = graph.newVertex(srcVertexId, version) + val srcVertex = builder.newVertex(srcVertexId, version) val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, kv.qualifier.length, schemaVer) @@ -80,15 +80,18 @@ class SnapshotEdgeDeserializable(graph: S2GraphLike) extends Deserializable[Snap val lockTs = Option(Bytes.toLong(kv.value, pos, 8)) val pendingEdge = - graph.newEdge(graph.newVertex(srcVertexId, version), - graph.newVertex(tgtVertexId, version), + builder.newEdge( + builder.newVertex(srcVertexId, version), + builder.newVertex(tgtVertexId, version), label, labelWithDir.dir, pendingEdgeOp, version, pendingEdgeProps.toMap, statusCode = pendingEdgeStatusCode, lockTs = lockTs, tsInnerValOpt = Option(tsInnerVal)) Option(pendingEdge) } - val snapshotEdge = graph.elementBuilder.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts), + val snapshotEdge = builder.newSnapshotEdge( + builder.newVertex(srcVertexId, ts), + builder.newVertex(tgtVertexId, ts), label, labelWithDir.dir, op, version, props.toMap, statusCode = statusCode, pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala index 70160a8..5576017 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala @@ -28,6 +28,7 @@ import org.apache.s2graph.core.{S2Graph, S2GraphLike, S2Vertex, S2VertexLike} class VertexDeserializable(graph: S2GraphLike, bytesToInt: (Array[Byte], Int) => Int = bytesToInt) extends Deserializable[S2VertexLike] { + val builder = graph.elementBuilder def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T], cacheElementOpt: Option[S2VertexLike]): Option[S2VertexLike] = { try { @@ -47,7 +48,7 @@ class VertexDeserializable(graph: S2GraphLike, propsMap += (columnMeta -> innerVal) } - val vertex = graph.newVertex(vertexId, kv.timestamp, S2Vertex.EmptyProps, belongLabelIds = Nil) + val vertex = builder.newVertex(vertexId, kv.timestamp, S2Vertex.EmptyProps, belongLabelIds = Nil) S2Vertex.fillPropsWithTs(vertex, propsMap.toMap) Option(vertex) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala index 002f577..d1d4d7d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala @@ -30,6 +30,7 @@ import scala.collection.mutable.ListBuffer class VertexDeserializable(graph: S2GraphLike, bytesToInt: (Array[Byte], Int) => Int = bytesToInt) extends Deserializable[S2VertexLike] { + val builder = graph.elementBuilder def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T], cacheElementOpt: Option[S2VertexLike]): Option[S2VertexLike] = { try { @@ -62,7 +63,7 @@ class VertexDeserializable(graph: S2GraphLike, } } assert(maxTs != Long.MinValue) - val vertex = graph.newVertex(vertexId, maxTs, S2Vertex.EmptyProps, belongLabelIds = belongLabelIds) + val vertex = builder.newVertex(vertexId, maxTs, S2Vertex.EmptyProps, belongLabelIds = belongLabelIds) S2Vertex.fillPropsWithTs(vertex, propsMap.toMap) Option(vertex) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala index 94883c9..2b439bc 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala @@ -66,14 +66,14 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { test("buildOperation") { val schemaVersion = "v2" val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion)) - val srcVertex = graph.newVertex(vertexId) + val srcVertex = builder.newVertex(vertexId) val tgtVertex = srcVertex val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) val snapshotEdge = None val propsWithTs = Map(timestampProp) - val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) + val requestEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) val newVersion = 0L @@ -93,14 +93,14 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { test("buildMutation: snapshotEdge: None with newProps") { val schemaVersion = "v2" val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion)) - val srcVertex = graph.newVertex(vertexId) + val srcVertex = builder.newVertex(vertexId) val tgtVertex = srcVertex val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) val snapshotEdge = None val propsWithTs = Map(timestampProp) - val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) + val requestEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) val newVersion = 0L @@ -120,14 +120,14 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { test("buildMutation: oldPropsWithTs == newPropsWithTs, Drop all requests") { val schemaVersion = "v2" val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion)) - val srcVertex = graph.newVertex(vertexId) + val srcVertex = builder.newVertex(vertexId) val tgtVertex = srcVertex val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) val snapshotEdge = None val propsWithTs = Map(timestampProp) - val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) + val requestEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) val newVersion = 0L @@ -144,7 +144,7 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { test("buildMutation: All props older than snapshotEdge's LastDeletedAt") { val schemaVersion = "v2" val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion)) - val srcVertex = graph.newVertex(vertexId) + val srcVertex = builder.newVertex(vertexId) val tgtVertex = srcVertex val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) @@ -159,12 +159,12 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) ) - val _snapshotEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = propsWithTs) + val _snapshotEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = propsWithTs) val snapshotEdge = Option(_snapshotEdge) - val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) + val requestEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) val newVersion = 0L val edgeMutate = S2Edge.buildMutation(snapshotEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) @@ -178,7 +178,7 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { test("buildMutation: All props newer than snapshotEdge's LastDeletedAt") { val schemaVersion = "v2" val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion)) - val srcVertex = graph.newVertex(vertexId) + val srcVertex = builder.newVertex(vertexId) val tgtVertex = srcVertex val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1) @@ -193,11 +193,11 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3) ) - val _snapshotEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = propsWithTs) + val _snapshotEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = propsWithTs) val snapshotEdge = Option(_snapshotEdge) - val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) + val requestEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs) val newVersion = 0L val edgeMutate = S2Edge.buildMutation(snapshotEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala index 4614bed..6ac77e4 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala @@ -35,12 +35,13 @@ trait TestCommonWithModels { var graph: S2Graph = _ var config: Config = _ var management: Management = _ + var builder: GraphElementBuilder = _ def initTests() = { config = ConfigFactory.load() graph = new S2Graph(config)(ExecutionContext.Implicits.global) management = new Management(graph) - + builder = graph.elementBuilder implicit val session = AutoSession deleteTestLabel()
