add S2GraphLike.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/6403bc9d Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/6403bc9d Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/6403bc9d Branch: refs/heads/master Commit: 6403bc9d69a1ec19824a9ee2f9779cb7c973b1dc Parents: e51289f Author: DO YUNG YOON <[email protected]> Authored: Fri Nov 10 13:16:47 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Fri Nov 10 15:42:25 2017 +0900 ---------------------------------------------------------------------- .../s2graph/core/GraphElementBuilder.scala | 2 +- .../org/apache/s2graph/core/Management.scala | 2 +- .../org/apache/s2graph/core/PostProcess.scala | 4 +- .../org/apache/s2graph/core/QueryResult.scala | 6 +- .../scala/org/apache/s2graph/core/S2Edge.scala | 6 +- .../org/apache/s2graph/core/S2EdgeBuilder.scala | 2 +- .../org/apache/s2graph/core/S2EdgeLike.scala | 2 +- .../scala/org/apache/s2graph/core/S2Graph.scala | 853 ++++++------------- .../org/apache/s2graph/core/S2GraphLike.scala | 151 +++- .../org/apache/s2graph/core/S2Vertex.scala | 2 +- .../apache/s2graph/core/S2VertexBuilder.scala | 2 +- .../org/apache/s2graph/core/S2VertexLike.scala | 2 +- .../s2graph/core/rest/RequestParser.scala | 2 +- .../apache/s2graph/core/rest/RestHandler.scala | 4 +- .../apache/s2graph/core/storage/Storage.scala | 2 +- .../apache/s2graph/core/storage/StorageIO.scala | 2 +- .../storage/WriteWriteConflictResolver.scala | 3 +- .../core/storage/hbase/AsynchbaseStorage.scala | 2 +- .../hbase/AsynchbaseStorageReadable.scala | 2 +- .../storage/hbase/AsynchbaseStorageSerDe.scala | 2 +- .../tall/IndexEdgeDeserializable.scala | 4 +- .../wide/IndexEdgeDeserializable.scala | 2 +- .../tall/SnapshotEdgeDeserializable.scala | 2 +- .../wide/SnapshotEdgeDeserializable.scala | 2 +- .../serde/vertex/VertexDeserializable.scala | 73 -- .../serde/vertex/VertexSerializable.scala | 62 -- .../vertex/tall/VertexDeserializable.scala | 4 +- .../vertex/wide/VertexDeserializable.scala | 4 +- .../core/tinkerpop/S2GraphProvider.scala | 2 +- 29 files changed, 438 insertions(+), 770 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala index 3ef7ba3..08da355 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala @@ -9,7 +9,7 @@ import play.api.libs.json.{JsObject, Json} import scala.util.Try -class GraphElementBuilder(graph: S2Graph) { +class GraphElementBuilder(graph: S2GraphLike) { def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try { val parts = GraphUtil.split(s) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index 49d3c0e..8d2d62a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -293,7 +293,7 @@ object Management { } } -class Management(graph: S2Graph) { +class Management(graph: S2GraphLike) { import Management._ import scala.collection.JavaConversions._ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala index 7047214..462c1e4 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala @@ -57,7 +57,7 @@ object PostProcess { case _ => Json.obj("message" -> ex.getMessage) } - def s2EdgeParent(graph: S2Graph, + def s2EdgeParent(graph: S2GraphLike, queryOption: QueryOption, parentEdges: Seq[EdgeWithScore]): JsValue = { if (parentEdges.isEmpty) JsNull @@ -193,7 +193,7 @@ object PostProcess { case _ => js } - def toJson(orgQuery: Option[JsValue])(graph: S2Graph, + def toJson(orgQuery: Option[JsValue])(graph: S2GraphLike, queryOption: QueryOption, stepResult: StepResult): JsValue = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala index b654e71..7d187c6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala @@ -27,7 +27,7 @@ import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.collection.{Seq, mutable} object QueryResult { - def fromVertices(graph: S2Graph, vertices: Seq[S2VertexLike], queryParams: Seq[QueryParam]): StepResult = { + def fromVertices(graph: S2GraphLike, vertices: Seq[S2VertexLike], queryParams: Seq[QueryParam]): StepResult = { val edgeWithScores = vertices.flatMap { vertex => queryParams.map { queryParam => val label = queryParam.label @@ -44,7 +44,7 @@ object QueryResult { StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil, false) } - def fromVertices(graph: S2Graph, + def fromVertices(graph: S2GraphLike, query: Query): StepResult = { if (query.steps.isEmpty || query.steps.head.queryParams.isEmpty) { StepResult.Empty @@ -301,7 +301,7 @@ object StepResult { } //TODO: Optimize this. - def filterOut(graph: S2Graph, + def filterOut(graph: S2GraphLike, queryOption: QueryOption, baseStepResult: StepResult, filterOutStepResult: StepResult): StepResult = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala index db530ac..3e33ed4 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala @@ -74,7 +74,7 @@ object SnapshotEdge { } } -case class SnapshotEdge(graph: S2Graph, +case class SnapshotEdge(graph: S2GraphLike, srcVertex: S2VertexLike, tgtVertex: S2VertexLike, label: Label, @@ -178,7 +178,7 @@ object IndexEdge { } } -case class IndexEdge(graph: S2Graph, +case class IndexEdge(graph: S2GraphLike, srcVertex: S2VertexLike, tgtVertex: S2VertexLike, label: Label, @@ -312,7 +312,7 @@ case class IndexEdge(graph: S2Graph, } } -case class S2Edge(override val innerGraph: S2Graph, +case class S2Edge(override val innerGraph: S2GraphLike, override val srcVertex: S2VertexLike, override var tgtVertex: S2VertexLike, override val innerLabel: Label, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala index 2ea1504..4004a13 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala @@ -46,7 +46,7 @@ class S2EdgeBuilder(edge: S2EdgeLike) { } } - def copyEdge(innerGraph: S2Graph = edge.innerGraph, + def copyEdge(innerGraph: S2GraphLike = edge.innerGraph, srcVertex: S2VertexLike = edge.srcVertex, tgtVertex: S2VertexLike = edge.tgtVertex, innerLabel: Label = edge.innerLabel, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala index 0b40cf5..0165439 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala @@ -15,7 +15,7 @@ import scala.concurrent.Await import scala.collection.JavaConverters._ trait S2EdgeLike extends Edge with GraphElement { - val innerGraph: S2Graph + val innerGraph: S2GraphLike val srcVertex: S2VertexLike var tgtVertex: S2VertexLike val innerLabel: Label http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index fc1205d..f80d5c2 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -121,11 +121,11 @@ object S2Graph { configuration } - def open(configuration: Configuration): S2Graph = { + def open(configuration: Configuration): S2GraphLike = { new S2Graph(configuration)(ec) } - def initStorage(graph: S2Graph, config: Config)(ec: ExecutionContext): Storage = { + def initStorage(graph: S2GraphLike, config: Config)(ec: ExecutionContext): Storage = { val storageBackend = config.getString("s2graph.storage.backend") logger.info(s"[InitStorage]: $storageBackend") @@ -158,130 +158,13 @@ object S2Graph { @Graph.OptOuts(value = Array( /* Process */ /* branch: passed all. */ -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.branch.BranchTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.branch.ChooseTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.branch.OptionalTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.branch.RepeatTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.branch.UnionTest$Traversals", method = "*", reason = "no"), -// passed: all - - /* filter */ -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.AndTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.CoinTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.CyclicPathTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.DedupTest$Traversals", method = "*", reason = "no"), -// passed: all - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.DropTest$Traversals", method = "g_V_properties_drop", reason = "please find bug on this case."), -// passed: all, failed: g_V_properties_drop - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.FilterTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.IsTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.OrTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.SampleTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.SimplePathTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.TailTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.WhereTest$Traversals", method = "*", reason = "no"), -// passed: all, /* map */ -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.AddEdgeTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.AddVertexTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CoalesceTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ConstantTest$Traversals", method = "*", reason = "no"), -// passed: all - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_both_both_count", reason = "count differ very little. fix this."), new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_repeatXoutX_timesX3X_count", reason = "count differ very little. fix this."), new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_repeatXoutX_timesX8X_count", reason = "count differ very litter. fix this."), -// passed: all, failed: g_V_both_both_count, g_V_repeatXoutX_timesX3X_count, g_V_repeatXoutX_timesX8X_count - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.FlatMapTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.FoldTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.LoopsTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MapTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MapKeysTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MapValuesTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MatchTest$CountMatchTraversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MatchTest$GreedyMatchTraversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MaxTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MeanTest$Traversals", method = "*", reason = "no"), -// failed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.MinTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SumTest$Traversals", method = "*", reason = "no"), -// failed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.PathTest$Traversals", method = "*", reason = "no"), -// passed: all new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "testProfileStrategyCallback", reason = "NullPointerException. fix this."), new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "g_V_whereXinXcreatedX_count_isX1XX_name_profile", reason = "java.lang.AssertionError: There should be 3 top-level metrics. fix this."), @@ -291,71 +174,15 @@ object S2Graph { new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "modern_V_out_out_profileXmetricsX", reason = "expected 2, actual 6. fix this."), new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "modern_V_out_out_profile", reason = "expected 2, actual 6. fix this."), new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "testProfileStrategyCallbackSideEffect", reason = "NullPointerException. fix this."), -// failed: grateful_V_out_out_profileXmetricsX, g_V_repeat_both_profileXmetricsX, grateful_V_out_out_profile, g_V_repeat_both_profile - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProjectTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertiesTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.SelectTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.UnfoldTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ValueMapTest$Traversals", method = "*", reason = "no"), -// passed: all /* sideEffect */ -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.AggregateTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupTestV3d0$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupCountTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectTest$Traversals", method = "*", reason = "no"), -// passed: all - - // new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SackTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectCapTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectTest$Traversals", method = "*", reason = "no"), -// passed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StoreTest$Traversals", method = "*", reason = "no"), -// passed: all - new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest$Traversals", method = "g_V_withSideEffectXsgX_repeatXbothEXcreatedX_subgraphXsgX_outVX_timesX5X_name_dedup", reason = "Expected 5, Actual 6."), new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest$Traversals", method = "g_V_withSideEffectXsgX_outEXknowsX_subgraphXsgX_name_capXsgX", reason = "Expected 3, Actual 6"), -// passed: all, failed: g_V_withSideEffectXsgX_repeatXbothEXcreatedX_subgraphXsgX_outVX_timesX5X_name_dedup, g_V_withSideEffectXsgX_outEXknowsX_subgraphXsgX_name_capXsgX - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.TreeTest$Traversals", method = "*", reason = "no"), -// passed: all - /* compliance */ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.CoreTraversalTest", method = "shouldThrowExceptionWhenIdsMixed", reason = "VertexId is not Element."), -// passed: all new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest", method = "*", reason = "not supported yet."), -// failed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.TranslationStrategyProcessTest", method = "*", reason = "no"), -// passed: all new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithSpecifiedId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."), new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithGeneratedCustomId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."), @@ -363,120 +190,56 @@ object S2Graph { new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithGeneratedDefaultId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."), new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithGeneratedCustomId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."), new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithSpecifiedId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."), -// failed: shouldGenerateDefaultIdOnAddVWithSpecifiedId, shouldGenerateDefaultIdOnAddVWithGeneratedCustomId, shouldGenerateDefaultIdOnGraphAddVWithGeneratedDefaultId, -// shouldGenerateDefaultIdOnAddVWithGeneratedDefaultId, shouldGenerateDefaultIdOnGraphAddVWithGeneratedCustomId, shouldGenerateDefaultIdOnGraphAddVWithSpecifiedId new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategyProcessTest", method = "*", reason = "not supported yet."), -// failed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategyProcessTest", method = "*", reason = "no"), -// passed: all new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.PartitionStrategyProcessTest", method = "*", reason = "not supported yet."), -// failed: all - -// new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SubgraphStrategyProcessTest", method = "*", reason = "no"), -// passed: all /* Structure */ - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method="shouldValidateIdEquality", reason="reference equals on EdgeId is not supported."), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method="shouldValidateEquality", reason="reference equals on EdgeId is not supported."), - // passed: all, failed: none - -// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.GraphConstructionTest", method="*", reason="no"), - // passed: all, failed: none + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method = "shouldValidateIdEquality", reason = "reference equals on EdgeId is not supported."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method = "shouldValidateEquality", reason = "reference equals on EdgeId is not supported."), -// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.PropertyTest", method="*", reason="no"), - // passed: all, failed: none + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.VertexTest$BasicVertexTest", method = "shouldHaveExceptionConsistencyWhenAssigningSameIdOnEdge", reason = "S2Vertex.addEdge behave as upsert."), -// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.VertexPropertyTest", method="*", reason="no"), - // passed: all, failed: none + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdgeTest", method = "shouldNotEvaluateToEqualDifferentId", reason = "reference equals is not supported."), -// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.FeatureSupportTest", method="*", reason="no"), - // passed: all, failed: none + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.detached.DetachedPropertyTest", method = "shouldNotBeEqualPropertiesAsThereIsDifferentKey", reason = "reference equals is not supported."), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.VertexTest$BasicVertexTest", method="shouldHaveExceptionConsistencyWhenAssigningSameIdOnEdge", reason="S2Vertex.addEdge behave as upsert."), - // passed: , failed: shouldHaveExceptionConsistencyWhenAssigningSameIdOnEdge + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldRemoveVertices", reason = "random label creation is not supported. all label need to be pre-configured."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldHaveExceptionConsistencyWhenAssigningSameIdOnVertex", reason = "Assigning the same ID to an Element update instead of throwing exception."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldRemoveEdges", reason = "random label creation is not supported. all label need to be pre-configured."), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdgeTest", method="shouldNotEvaluateToEqualDifferentId", reason="reference equals is not supported."), - // passed: all, failed: shouldNotEvaluateToEqualDifferentId + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdgeTest", method = "shouldNotEvaluateToEqualDifferentId", reason = "Assigning the same ID to an Element update instead of throwing exception."), -// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexTest", method="*", reason="no"), - // passed: all, failed: none + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.1)", reason = "graphson-v2-embedded is not supported."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.5)", reason = "graphson-v2-embedded is not supported."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.5)", reason = "graphson-v2-embedded is not supported."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.1)", reason = "graphson-v2-embedded is not supported."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(PowerLawDistribution{gamma=2.3, multiplier=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.25)", reason = "graphson-v2-embedded is not supported."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(PowerLawDistribution{gamma=2.3, multiplier=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.25)", reason = "graphson-v2-embedded is not supported."), -// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.detached.DetachedGraphTest", method="*", reason="no"), - // passed: all, failed: none, all ignored + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.DistributionGeneratorTest", method = "*", reason = "non-deterministic test."), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.detached.DetachedPropertyTest", method="shouldNotBeEqualPropertiesAsThereIsDifferentKey", reason="reference equals is not supported."), -// // passed: , failed: shouldNotBeEqualPropertiesAsThereIsDifferentKey + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.SerializationTest$GryoTest", method = "shouldSerializeTree", reason = "order of children is reversed. not sure why."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.SerializationTest$GraphSONTest", method = "shouldSerializeTraversalMetrics", reason = "expected 2, actual 3."), -// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexPropertyTest", method="*", reason="no"), - // passed: all, failed: none + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithBOTHEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithINEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteDetachedVertexAsReferenceNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithOUTEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteDetachedVertexNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.GraphTest", method="shouldRemoveVertices", reason="random label creation is not supported. all label need to be pre-configured."), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.GraphTest", method="shouldHaveExceptionConsistencyWhenAssigningSameIdOnVertex", reason="Assigning the same ID to an Element update instead of throwing exception."), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.GraphTest", method="shouldRemoveEdges", reason="random label creation is not supported. all label need to be pre-configured."), - // passed: , failed: + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteDetachedEdgeAsReference", specific = "graphson-v2-embedded", reason = "no"), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteEdge", specific = "graphson-v2-embedded", reason = "no"), + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteDetachedEdge", specific = "graphson-v2-embedded", reason = "no"), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdgeTest", method="shouldNotEvaluateToEqualDifferentId", reason="Assigning the same ID to an Element update instead of throwing exception."), - // passed: all, skip: shouldNotEvaluateToEqualDifferentId + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoGraphTest", method = "*", reason = "no"), // all failed. - -// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertexPropertyTest", method="*", reason="no"), - // passed: all, failed: none - -// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceGraphTest", method="*", reason="no"), - // passed: all, failed: none, all ignored -// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertexTest", method="*", reason="no"), - // passed: all, failed: none, all ignored - -// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.star.StarGraphTest", method="*", reason="no"), - // passed: all, - - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method="shouldGenerateDifferentGraph", specific="test(NormalDistribution{stdDeviation=2.0, mean=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.1)", reason="graphson-v2-embedded is not supported."), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method="shouldGenerateDifferentGraph", specific="test(NormalDistribution{stdDeviation=2.0, mean=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.5)", reason="graphson-v2-embedded is not supported."), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method="shouldGenerateDifferentGraph", specific="test(NormalDistribution{stdDeviation=2.0, mean=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.5)", reason="graphson-v2-embedded is not supported."), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method="shouldGenerateDifferentGraph", specific="test(NormalDistribution{stdDeviation=2.0, mean=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.1)", reason="graphson-v2-embedded is not supported."), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method="shouldGenerateDifferentGraph", specific="test(PowerLawDistribution{gamma=2.3, multiplier=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.25)", reason="graphson-v2-embedded is not supported."), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method="shouldGenerateDifferentGraph", specific="test(PowerLawDistribution{gamma=2.3, multiplier=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.25)", reason="graphson-v2-embedded is not supported."), - // passed: all, except shouldGenerateDifferentGraph method. - - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.algorithm.generator.DistributionGeneratorTest", method="*", reason="non-deterministic test."), - // all failed. - - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.SerializationTest$GryoTest", method="shouldSerializeTree", reason="order of children is reversed. not sure why."), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.SerializationTest$GraphSONTest", method="shouldSerializeTraversalMetrics", reason="expected 2, actual 3."), - // passed: all, failed: $GryoTest.shouldSerializeTree - -// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoCustomTest", method="*", reason="no"), - // all ignored. - -// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoPropertyTest", method="*", reason="no"), - // all passed. - - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method="shouldReadWriteVertexWithBOTHEdges", specific="graphson-v2-embedded", reason="Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method="shouldReadWriteVertexWithINEdges", specific="graphson-v2-embedded", reason="Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method="shouldReadWriteDetachedVertexAsReferenceNoEdges", specific="graphson-v2-embedded", reason="Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method="shouldReadWriteVertexNoEdges", specific="graphson-v2-embedded", reason="Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method="shouldReadWriteVertexWithOUTEdges", specific="graphson-v2-embedded", reason="Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method="shouldReadWriteDetachedVertexNoEdges", specific="graphson-v2-embedded", reason="Vertex.id() is deserialized as string, not class in graphson-v2-embedded."), - // passed: all, except graphson-v2-embedded. - - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method="shouldReadWriteDetachedEdgeAsReference", specific="graphson-v2-embedded", reason="no"), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method="shouldReadWriteEdge", specific="graphson-v2-embedded", reason="no"), - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method="shouldReadWriteDetachedEdge", specific="graphson-v2-embedded", reason="no"), - // passed: all, except graphson-v2-embedded. - - // TODO: - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoGraphTest", method="*", reason="no"), // all failed. - - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoTest", method="*", reason="no") - // all failed. + new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoTest", method = "*", reason = "no") )) class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2GraphLike { - import S2Graph._ - var apacheConfiguration: Configuration = _ def dbSession() = scalikejdbc.AutoSession @@ -488,35 +251,30 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap private val running = new AtomicBoolean(true) - val config = _config.withFallback(S2Graph.DefaultConfig) + override val config = _config.withFallback(S2Graph.DefaultConfig) Model.apply(config) Model.loadCache() - val MaxRetryNum = config.getInt("max.retry.number") - val MaxBackOff = config.getInt("max.back.off") - val BackoffTimeout = config.getInt("back.off.timeout") - val DeleteAllFetchCount = config.getInt("delete.all.fetch.count") - val DeleteAllFetchSize = config.getInt("delete.all.fetch.size") - val FailProb = config.getDouble("hbase.fail.prob") - val LockExpireDuration = config.getInt("lock.expire.time") - val MaxSize = config.getInt("future.cache.max.size") - val ExpireAfterWrite = config.getInt("future.cache.expire.after.write") - val ExpireAfterAccess = config.getInt("future.cache.expire.after.access") - val WaitTimeout = Duration(600, TimeUnit.SECONDS) - - val management = new Management(this) + override val management = new Management(this) - def getManagement() = management + override val indexProvider = IndexProvider.apply(config) - private val localLongId = new AtomicLong() + override val elementBuilder = new GraphElementBuilder(this) - def nextLocalLongId = localLongId.getAndIncrement() + override val traversalHelper = new TraversalHelper(this) private def confWithFallback(conf: Config): Config = { conf.withFallback(config) } + val defaultStorage: Storage = S2Graph.initStorage(this, config)(ec) + + for { + entry <- config.entrySet() if S2Graph.DefaultConfigs.contains(entry.getKey) + (k, v) = (entry.getKey, entry.getValue) + } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}") + /** * TODO: we need to some way to handle malformed configuration for storage. */ @@ -553,31 +311,15 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap m } - val defaultStorage: Storage = S2Graph.initStorage(this, config)(ec) - - /** QueryLevel FutureCache */ - val queryFutureCache = new DeferCache[StepResult, Promise, Future](parseCacheConfig(config, "query."), empty = StepResult.Empty) - - for { - entry <- config.entrySet() if S2Graph.DefaultConfigs.contains(entry.getKey) - (k, v) = (entry.getKey, entry.getValue) - } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}") - - val indexProvider = IndexProvider.apply(config) - - val elementBuilder = new GraphElementBuilder(this) - - val traversalHelper = new TraversalHelper(this) - - def getStorage(service: Service): Storage = { + override def getStorage(service: Service): Storage = { storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage) } - def getStorage(label: Label): Storage = { + override def getStorage(label: Label): Storage = { storagePool.getOrElse(s"label:${label.label}", defaultStorage) } - def flushStorage(): Unit = { + override def flushStorage(): Unit = { storagePool.foreach { case (_, storage) => /* flush is blocking */ @@ -585,9 +327,26 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap } } - def fallback = Future.successful(StepResult.Empty) + override def shutdown(modelDataDelete: Boolean = false): Unit = + if (running.compareAndSet(true, false)) { + flushStorage() + Model.shutdown(modelDataDelete) + defaultStorage.shutdown() + localLongId.set(0l) + } + + override def getVertices(vertices: Seq[S2VertexLike]): Future[Seq[S2VertexLike]] = { + val verticesWithIdx = vertices.zipWithIndex + val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) => + getStorage(service).fetchVertices(vertices).map(_.zip(vertexGroup.map(_._2))) + } + + Future.sequence(futures).map { ls => + ls.flatten.toSeq.sortBy(_._2).map(_._1) + } + } - def checkEdges(edges: Seq[S2EdgeLike]): Future[StepResult] = { + override def checkEdges(edges: Seq[S2EdgeLike]): Future[StepResult] = { val futures = for { edge <- edges } yield { @@ -602,9 +361,95 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap } } - // def checkEdges(edges: Seq[Edge]): Future[StepResult] = storage.checkEdges(edges) + override def mutateVertices(vertices: Seq[S2VertexLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = { + def mutateVertices(storage: Storage)(zkQuorum: String, vertices: Seq[S2VertexLike], + withWait: Boolean = false): Future[Seq[MutateResponse]] = { + val futures = vertices.map { vertex => storage.mutateVertex(zkQuorum, vertex, withWait) } + Future.sequence(futures) + } + + val verticesWithIdx = vertices.zipWithIndex + val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) => + mutateVertices(getStorage(service))(service.cluster, vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2))) + } + Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) } + } + + override def mutateEdges(edges: Seq[S2EdgeLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = { + val edgeWithIdxs = edges.zipWithIndex + + val (strongEdges, weakEdges) = + edgeWithIdxs.partition { case (edge, idx) => + val e = edge + e.innerLabel.consistencyLevel == "strong" && e.getOp() != GraphUtil.operations("insertBulk") + } + + val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) => + val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, edgeGroup) => + val storage = getStorage(label) + val edges = edgeGroup.map(_._1) + val idxs = edgeGroup.map(_._2) + + /* multiple edges with weak consistency level will be processed as batch */ + storage.mutateWeakEdges(zkQuorum, edges, withWait) + } + Future.sequence(futures) + } + val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.getOp() == GraphUtil.operations("deleteAll") } + + val deleteAllFutures = strongDeleteAll.map { case (edge, idx) => + deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.innerLabel), edge.getDir(), edge.ts).map(idx -> _) + } + + val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) => + val edges = edgeGroup.map(_._1) + val idxs = edgeGroup.map(_._2) + val storage = getStorage(label) + val zkQuorum = label.hbaseZkAddr + storage.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets => + idxs.zip(rets) + } + } + + for { + weak <- Future.sequence(weakEdgesFutures) + deleteAll <- Future.sequence(deleteAllFutures) + strong <- Future.sequence(strongEdgesFutures) + } yield { + (deleteAll ++ weak.flatten.flatten ++ strong.flatten).sortBy(_._1).map(r => new MutateResponse(r._2)) + } + } + + override def mutateElements(elements: Seq[GraphElement], + withWait: Boolean = false): Future[Seq[MutateResponse]] = { + + val edgeBuffer = ArrayBuffer[(S2EdgeLike, Int)]() + val vertexBuffer = ArrayBuffer[(S2VertexLike, Int)]() + + elements.zipWithIndex.foreach { + case (e: S2EdgeLike, idx: Int) => edgeBuffer.append((e, idx)) + case (v: S2VertexLike, idx: Int) => vertexBuffer.append((v, idx)) + case any@_ => logger.error(s"Unknown type: ${any}") + } + + val edgeFutureWithIdx = mutateEdges(edgeBuffer.map(_._1), withWait).map { result => + edgeBuffer.map(_._2).zip(result) + } + + val vertexFutureWithIdx = mutateVertices(vertexBuffer.map(_._1), withWait).map { result => + vertexBuffer.map(_._2).zip(result) + } + + val graphFuture = for { + edgesMutated <- edgeFutureWithIdx + verticesMutated <- vertexFutureWithIdx + } yield (edgesMutated ++ verticesMutated).sortBy(_._1).map(_._2) - def getEdges(q: Query): Future[StepResult] = { + graphFuture + + } + + override def getEdges(q: Query): Future[StepResult] = { Try { if (q.steps.isEmpty) { // TODO: this should be get vertex query. @@ -629,76 +474,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap } get } - def getEdgesStepInner(q: Query, buildLastStepInnerResult: Boolean = false): Future[StepResult] = { - Try { - if (q.steps.isEmpty) fallback - else { - def fetch: Future[StepResult] = { - val startStepInnerResult = QueryResult.fromVertices(this, q) - q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) => - for { - prevStepInnerResult <- prevStepInnerResultFuture - currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult, buildLastStepInnerResult) - } yield { - currentStepInnerResult.copy( - accumulatedCursors = prevStepInnerResult.accumulatedCursors :+ currentStepInnerResult.cursors, - failCount = currentStepInnerResult.failCount + prevStepInnerResult.failCount - ) - } - } - } - - fetch - } - } recover { - case e: Exception => - logger.error(s"getEdgesAsync: $e", e) - fallback - } get - } - - def fetchStep(orgQuery: Query, - stepIdx: Int, - stepInnerResult: StepResult, - buildLastStepInnerResult: Boolean = false): Future[StepResult] = { - if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty) - else { - val (_, prevStepTgtVertexIdEdges: Map[VertexId, ArrayBuffer[EdgeWithScore]], queryRequests: Seq[QueryRequest]) = - traversalHelper.buildNextStepQueryRequests(orgQuery, stepIdx, stepInnerResult) - - val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges) - - traversalHelper.filterEdges(orgQuery, stepIdx, queryRequests, - fetchedLs, orgQuery.steps(stepIdx).queryParams, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec) - } - } - - - /** - * responsible to fire parallel fetch call into storage and create future that will return merged result. - * - * @param queryRequests - * @param prevStepEdges - * @return - */ - def fetches(queryRequests: Seq[QueryRequest], - prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = { - - val reqWithIdxs = queryRequests.zipWithIndex - val requestsPerLabel = reqWithIdxs.groupBy(t => t._1.queryParam.label) - val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) => - for { - prev <- prevFuture - cur <- getStorage(label).fetches(reqWithIdxs.map(_._1), prevStepEdges) - } yield { - prev ++ reqWithIdxs.map(_._2).zip(cur).toMap - } - } - aggFuture.map { agg => agg.toSeq.sortBy(_._1).map(_._2) } - } - - - def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = { + override def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = { Try { if (mq.queries.isEmpty) fallback else { @@ -722,18 +498,63 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap } get } - def getVertices(vertices: Seq[S2VertexLike]): Future[Seq[S2VertexLike]] = { - val verticesWithIdx = vertices.zipWithIndex - val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) => - getStorage(service).fetchVertices(vertices).map(_.zip(vertexGroup.map(_._2))) + override def deleteAllAdjacentEdges(srcVertices: Seq[S2VertexLike], + labels: Seq[Label], + dir: Int, + ts: Long): Future[Boolean] = { + val requestTs = ts + val vertices = srcVertices + /* create query per label */ + val queries = for { + label <- labels + } yield { + val queryParam = QueryParam(labelName = label.label, direction = GraphUtil.fromDirection(dir), + offset = 0, limit = DeleteAllFetchSize, duplicatePolicy = DuplicatePolicy.Raw) + val step = Step(List(queryParam)) + Query(vertices, Vector(step)) + } + + val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) { + fetchAndDeleteAll(queries, requestTs) + } { case (allDeleted, deleteSuccess) => + allDeleted && deleteSuccess + }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess } + + retryFuture onFailure { + case ex => + logger.error(s"[Error]: deleteAllAdjacentEdges failed.") } + retryFuture + } + + override def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = { + val edgesWithIdx = edges.zipWithIndex + val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) => + getStorage(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2))) + } Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) } } - def edgesAsync(vertex: S2VertexLike, direction: Direction, labelNames: String*): Future[util.Iterator[Edge]] = { + override def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): Future[MutateResponse] = { + val label = edge.innerLabel + val storage = getStorage(label) + + storage.updateDegree(label.hbaseZkAddr, edge, degreeVal) + } + + override def getVertex(vertexId: VertexId): Option[S2VertexLike] = { + val v = newVertex(vertexId) + Await.result(getVertices(Seq(v)).map { vertices => vertices.headOption }, WaitTimeout) + } + + override def fetchEdges(vertex: S2VertexLike, labelNameWithDirs: Seq[(String, String)]): util.Iterator[Edge] = { + Await.result(fetchEdgesAsync(vertex, labelNameWithDirs), WaitTimeout) + } + + override def edgesAsync(vertex: S2VertexLike, direction: Direction, labelNames: String*): Future[util.Iterator[Edge]] = { val labelNameWithDirs = if (labelNames.isEmpty) { // TODO: Let's clarify direction @@ -756,39 +577,71 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap fetchEdgesAsync(vertex, labelNameWithDirs.distinct) } - /** mutate */ - def deleteAllAdjacentEdges(srcVertices: Seq[S2VertexLike], - labels: Seq[Label], - dir: Int, - ts: Long): Future[Boolean] = { + def isRunning(): Boolean = running.get() - val requestTs = ts - val vertices = srcVertices - /* create query per label */ - val queries = for { - label <- labels - } yield { - val queryParam = QueryParam(labelName = label.label, direction = GraphUtil.fromDirection(dir), - offset = 0, limit = DeleteAllFetchSize, duplicatePolicy = DuplicatePolicy.Raw) - val step = Step(List(queryParam)) - Query(vertices, Vector(step)) - } + /** Private **/ - val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) { - fetchAndDeleteAll(queries, requestTs) - } { case (allDeleted, deleteSuccess) => - allDeleted && deleteSuccess - }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess } + private def getEdgesStepInner(q: Query, buildLastStepInnerResult: Boolean = false): Future[StepResult] = { + Try { + if (q.steps.isEmpty) fallback + else { + def fetch: Future[StepResult] = { + val startStepInnerResult = QueryResult.fromVertices(this, q) + q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) => + for { + prevStepInnerResult <- prevStepInnerResultFuture + currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult, buildLastStepInnerResult) + } yield { + currentStepInnerResult.copy( + accumulatedCursors = prevStepInnerResult.accumulatedCursors :+ currentStepInnerResult.cursors, + failCount = currentStepInnerResult.failCount + prevStepInnerResult.failCount + ) + } + } + } - retryFuture onFailure { - case ex => - logger.error(s"[Error]: deleteAllAdjacentEdges failed.") + fetch + } + } recover { + case e: Exception => + logger.error(s"getEdgesAsync: $e", e) + fallback + } get + } + + private def fetchStep(orgQuery: Query, + stepIdx: Int, + stepInnerResult: StepResult, + buildLastStepInnerResult: Boolean = false): Future[StepResult] = { + if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty) + else { + val (_, prevStepTgtVertexIdEdges: Map[VertexId, ArrayBuffer[EdgeWithScore]], queryRequests: Seq[QueryRequest]) = + traversalHelper.buildNextStepQueryRequests(orgQuery, stepIdx, stepInnerResult) + + val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges) + + traversalHelper.filterEdges(orgQuery, stepIdx, queryRequests, + fetchedLs, orgQuery.steps(stepIdx).queryParams, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec) } + } - retryFuture + private def fetches(queryRequests: Seq[QueryRequest], + prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = { + + val reqWithIdxs = queryRequests.zipWithIndex + val requestsPerLabel = reqWithIdxs.groupBy(t => t._1.queryParam.label) + val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) => + for { + prev <- prevFuture + cur <- getStorage(label).fetches(reqWithIdxs.map(_._1), prevStepEdges) + } yield { + prev ++ reqWithIdxs.map(_._2).zip(cur).toMap + } + } + aggFuture.map { agg => agg.toSeq.sortBy(_._1).map(_._2) } } - def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = { + private def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = { val futures = queries.map(getEdgesStepInner(_, true)) val future = for { stepInnerResultLs <- Future.sequence(futures) @@ -807,8 +660,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap } - def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult], - requestTs: Long): Future[(Boolean, Boolean)] = { + private def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult], + requestTs: Long): Future[(Boolean, Boolean)] = { stepInnerResultLs.foreach { stepInnerResult => if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.") } @@ -853,162 +706,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap } } - - def mutateElements(elements: Seq[GraphElement], - withWait: Boolean = false): Future[Seq[MutateResponse]] = { - - val edgeBuffer = ArrayBuffer[(S2EdgeLike, Int)]() - val vertexBuffer = ArrayBuffer[(S2VertexLike, Int)]() - - elements.zipWithIndex.foreach { - case (e: S2EdgeLike, idx: Int) => edgeBuffer.append((e, idx)) - case (v: S2VertexLike, idx: Int) => vertexBuffer.append((v, idx)) - case any@_ => logger.error(s"Unknown type: ${any}") - } - - val edgeFutureWithIdx = mutateEdges(edgeBuffer.map(_._1), withWait).map { result => - edgeBuffer.map(_._2).zip(result) - } - - val vertexFutureWithIdx = mutateVertices(vertexBuffer.map(_._1), withWait).map { result => - vertexBuffer.map(_._2).zip(result) - } - - val graphFuture = for { - edgesMutated <- edgeFutureWithIdx - verticesMutated <- vertexFutureWithIdx - } yield (edgesMutated ++ verticesMutated).sortBy(_._1).map(_._2) - - graphFuture - - } - - def mutateEdges(edges: Seq[S2EdgeLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = { - val edgeWithIdxs = edges.zipWithIndex - - val (strongEdges, weakEdges) = - edgeWithIdxs.partition { case (edge, idx) => - val e = edge - e.innerLabel.consistencyLevel == "strong" && e.getOp() != GraphUtil.operations("insertBulk") - } - - val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) => - val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, edgeGroup) => - val storage = getStorage(label) - val edges = edgeGroup.map(_._1) - val idxs = edgeGroup.map(_._2) - - /* multiple edges with weak consistency level will be processed as batch */ - storage.mutateWeakEdges(zkQuorum, edges, withWait) - } - Future.sequence(futures) - } - val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.getOp() == GraphUtil.operations("deleteAll") } - - val deleteAllFutures = strongDeleteAll.map { case (edge, idx) => - deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.innerLabel), edge.getDir(), edge.ts).map(idx -> _) - } - - val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) => - val edges = edgeGroup.map(_._1) - val idxs = edgeGroup.map(_._2) - val storage = getStorage(label) - val zkQuorum = label.hbaseZkAddr - storage.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets => - idxs.zip(rets) - } - } - - for { - weak <- Future.sequence(weakEdgesFutures) - deleteAll <- Future.sequence(deleteAllFutures) - strong <- Future.sequence(strongEdgesFutures) - } yield { - (deleteAll ++ weak.flatten.flatten ++ strong.flatten).sortBy(_._1).map(r => new MutateResponse(r._2)) - } - } - - def mutateVertices(vertices: Seq[S2VertexLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = { - def mutateVertices(storage: Storage)(zkQuorum: String, vertices: Seq[S2VertexLike], - withWait: Boolean = false): Future[Seq[MutateResponse]] = { - val futures = vertices.map { vertex => storage.mutateVertex(zkQuorum, vertex, withWait) } - Future.sequence(futures) - } - - val verticesWithIdx = vertices.zipWithIndex - val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) => - mutateVertices(getStorage(service))(service.cluster, vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2))) - } - Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) } - } - - def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = { - val edgesWithIdx = edges.zipWithIndex - val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) => - getStorage(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2))) - } - Future.sequence(futures).map { ls => - ls.flatten.toSeq.sortBy(_._2).map(_._1) - } - } - - def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): Future[MutateResponse] = { - val label = edge.innerLabel - val storage = getStorage(label) - - storage.updateDegree(label.hbaseZkAddr, edge, degreeVal) - } - - def isRunning(): Boolean = running.get() - - def shutdown(modelDataDelete: Boolean = false): Unit = - if (running.compareAndSet(true, false)) { - flushStorage() - Model.shutdown(modelDataDelete) - defaultStorage.shutdown() - localLongId.set(0l) - } - - - def newEdge(srcVertex: S2VertexLike, - tgtVertex: S2VertexLike, - innerLabel: Label, - dir: Int, - op: Byte = GraphUtil.defaultOpByte, - version: Long = System.currentTimeMillis(), - propsWithTs: S2Edge.State, - parentEdges: Seq[EdgeWithScore] = Nil, - originalEdgeOpt: Option[S2EdgeLike] = None, - pendingEdgeOpt: Option[S2EdgeLike] = None, - statusCode: Byte = 0, - lockTs: Option[Long] = None, - tsInnerValOpt: Option[InnerValLike] = None): S2EdgeLike = - elementBuilder.newEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, propsWithTs, - parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) - - def newVertexId(service: Service, - column: ServiceColumn, - id: Any): VertexId = - elementBuilder.newVertexId(service, column, id) - - def newVertex(id: VertexId, - ts: Long = System.currentTimeMillis(), - props: S2Vertex.Props = S2Vertex.EmptyProps, - op: Byte = 0, - belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = - elementBuilder.newVertex(id, ts, props, op, belongLabelIds) - - - def getVertex(vertexId: VertexId): Option[S2VertexLike] = { - val v = newVertex(vertexId) - Await.result(getVertices(Seq(v)).map { vertices => vertices.headOption }, WaitTimeout) - } - - def fetchEdges(vertex: S2VertexLike, labelNameWithDirs: Seq[(String, String)]): util.Iterator[Edge] = { - Await.result(fetchEdgesAsync(vertex, labelNameWithDirs), WaitTimeout) - } - - def fetchEdgesAsync(vertex: S2VertexLike, labelNameWithDirs: Seq[(String, String)]): Future[util.Iterator[Edge]] = { + private def fetchEdgesAsync(vertex: S2VertexLike, labelNameWithDirs: Seq[(String, String)]): Future[util.Iterator[Edge]] = { val queryParams = labelNameWithDirs.map { case (l, direction) => QueryParam(labelName = l, direction = direction.toLowerCase) } @@ -1021,25 +719,4 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap ls.iterator() } } - - def toVertex(serviceName: String, - columnName: String, - id: Any, - props: Map[String, Any] = Map.empty, - ts: Long = System.currentTimeMillis(), - operation: String = "insert"): S2VertexLike = - elementBuilder.toVertex(serviceName, columnName, id, props, ts, operation) - - def toEdge(srcId: Any, - tgtId: Any, - labelName: String, - direction: String, - props: Map[String, Any] = Map.empty, - ts: Long = System.currentTimeMillis(), - operation: String = "insert"): S2EdgeLike = - elementBuilder.toEdge(srcId, tgtId, labelName, direction, props, ts, operation) - - def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = - elementBuilder.toGraphElement(s, labelMapping) - } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala index 03a92c6..a58f1e0 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala @@ -1,31 +1,157 @@ package org.apache.s2graph.core + import java.util +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicLong +import com.typesafe.config.Config import org.apache.commons.configuration.Configuration import org.apache.s2graph.core.GraphExceptions.LabelNotExistException import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName} import org.apache.s2graph.core.features.{S2Features, S2GraphVariables} -import org.apache.s2graph.core.mysqls.{Label, LabelMeta} -import org.apache.s2graph.core.types.VertexId +import org.apache.s2graph.core.index.IndexProvider +import org.apache.s2graph.core.mysqls.{Label, LabelMeta, Service, ServiceColumn} +import org.apache.s2graph.core.storage.{MutateResponse, Storage} +import org.apache.s2graph.core.types.{InnerValLike, VertexId} import org.apache.tinkerpop.gremlin.process.computer.GraphComputer import org.apache.tinkerpop.gremlin.structure import org.apache.tinkerpop.gremlin.structure.Edge.Exceptions -import org.apache.tinkerpop.gremlin.structure.Graph.{Features, Variables} +import org.apache.tinkerpop.gremlin.structure.Graph.Variables import org.apache.tinkerpop.gremlin.structure.io.{GraphReader, GraphWriter, Io, Mapper} -import org.apache.tinkerpop.gremlin.structure.{Edge, Element, Graph, T, Transaction, Vertex} +import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Element, Graph, T, Transaction, Vertex} -import scala.concurrent.{Await, Future} import scala.collection.JavaConversions._ +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} + trait S2GraphLike extends Graph { - this: S2Graph => + implicit val ec: ExecutionContext var apacheConfiguration: Configuration - private val s2Features = new S2Features + protected val localLongId = new AtomicLong() + + protected val s2Features = new S2Features + + val config: Config + + val management: Management + + val indexProvider: IndexProvider + + val elementBuilder: GraphElementBuilder + + val traversalHelper: TraversalHelper + + lazy val MaxRetryNum: Int = config.getInt("max.retry.number") + lazy val MaxBackOff: Int = config.getInt("max.back.off") + lazy val BackoffTimeout: Int = config.getInt("back.off.timeout") + lazy val DeleteAllFetchCount: Int = config.getInt("delete.all.fetch.count") + lazy val DeleteAllFetchSize: Int = config.getInt("delete.all.fetch.size") + lazy val FailProb: Double = config.getDouble("hbase.fail.prob") + lazy val LockExpireDuration: Int = config.getInt("lock.expire.time") + lazy val MaxSize: Int = config.getInt("future.cache.max.size") + lazy val ExpireAfterWrite: Int = config.getInt("future.cache.expire.after.write") + lazy val ExpireAfterAccess: Int = config.getInt("future.cache.expire.after.access") + lazy val WaitTimeout: Duration = Duration(600, TimeUnit.SECONDS) override def features() = s2Features + def nextLocalLongId = localLongId.getAndIncrement() + + def fallback = Future.successful(StepResult.Empty) + + def defaultStorage: Storage + + def getStorage(service: Service): Storage + + def getStorage(label: Label): Storage + + def flushStorage(): Unit + + def shutdown(modelDataDelete: Boolean = false): Unit + + def getVertices(vertices: Seq[S2VertexLike]): Future[Seq[S2VertexLike]] + + def checkEdges(edges: Seq[S2EdgeLike]): Future[StepResult] + + def mutateVertices(vertices: Seq[S2VertexLike], withWait: Boolean = false): Future[Seq[MutateResponse]] + + def mutateEdges(edges: Seq[S2EdgeLike], withWait: Boolean = false): Future[Seq[MutateResponse]] + + def mutateElements(elements: Seq[GraphElement], + withWait: Boolean = false): Future[Seq[MutateResponse]] + + def getEdges(q: Query): Future[StepResult] + + def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] + + def deleteAllAdjacentEdges(srcVertices: Seq[S2VertexLike], + labels: Seq[Label], + dir: Int, + ts: Long): Future[Boolean] + + def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] + + def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): Future[MutateResponse] + + def getVertex(vertexId: VertexId): Option[S2VertexLike] + + def fetchEdges(vertex: S2VertexLike, labelNameWithDirs: Seq[(String, String)]): util.Iterator[Edge] + + def edgesAsync(vertex: S2VertexLike, direction: Direction, labelNames: String*): Future[util.Iterator[Edge]] + + /** Convert to Graph Element **/ + def newEdge(srcVertex: S2VertexLike, + tgtVertex: S2VertexLike, + innerLabel: Label, + dir: Int, + op: Byte = GraphUtil.defaultOpByte, + version: Long = System.currentTimeMillis(), + propsWithTs: S2Edge.State, + parentEdges: Seq[EdgeWithScore] = Nil, + originalEdgeOpt: Option[S2EdgeLike] = None, + pendingEdgeOpt: Option[S2EdgeLike] = None, + statusCode: Byte = 0, + lockTs: Option[Long] = None, + tsInnerValOpt: Option[InnerValLike] = None): S2EdgeLike = + elementBuilder.newEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, propsWithTs, + parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) + + def newVertexId(service: Service, + column: ServiceColumn, + id: Any): VertexId = + elementBuilder.newVertexId(service, column, id) + + def newVertex(id: VertexId, + ts: Long = System.currentTimeMillis(), + props: S2Vertex.Props = S2Vertex.EmptyProps, + op: Byte = 0, + belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = + elementBuilder.newVertex(id, ts, props, op, belongLabelIds) + + def toVertex(serviceName: String, + columnName: String, + id: Any, + props: Map[String, Any] = Map.empty, + ts: Long = System.currentTimeMillis(), + operation: String = "insert"): S2VertexLike = + elementBuilder.toVertex(serviceName, columnName, id, props, ts, operation) + + def toEdge(srcId: Any, + tgtId: Any, + labelName: String, + direction: String, + props: Map[String, Any] = Map.empty, + ts: Long = System.currentTimeMillis(), + operation: String = "insert"): S2EdgeLike = + elementBuilder.toEdge(srcId, tgtId, labelName, direction, props, ts, operation) + + def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = + elementBuilder.toGraphElement(s, labelMapping) + + /** TinkerPop Interfaces **/ def vertices(ids: AnyRef*): util.Iterator[structure.Vertex] = { val fetchVertices = ids.lastOption.map { lastParam => if (lastParam.isInstanceOf[Boolean]) lastParam.asInstanceOf[Boolean] @@ -38,9 +164,9 @@ trait S2GraphLike extends Graph { } else { val vertices = ids.collect { case s2Vertex: S2VertexLike => s2Vertex - case vId: VertexId => newVertex(vId) - case vertex: Vertex => newVertex(vertex.id().asInstanceOf[VertexId]) - case other@_ => newVertex(VertexId.fromString(other.toString)) + case vId: VertexId => elementBuilder.newVertex(vId) + case vertex: Vertex => elementBuilder.newVertex(vertex.id().asInstanceOf[VertexId]) + case other@_ => elementBuilder.newVertex(VertexId.fromString(other.toString)) } if (fetchVertices) { @@ -153,7 +279,7 @@ trait S2GraphLike extends Graph { props: S2Vertex.Props = S2Vertex.EmptyProps, op: Byte = 0, belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = { - val vertex = newVertex(id, ts, props, op, belongLabelIds) + val vertex = elementBuilder.newVertex(id, ts, props, op, belongLabelIds) val future = mutateVertices(Seq(vertex), withWait = true).map { rets => if (rets.forall(_.isSuccess)) vertex @@ -201,7 +327,7 @@ trait S2GraphLike extends Graph { val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts) val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) - val edge = newEdge(srcVertex, otherV, label, dir, op = op, version = ts, propsWithTs = propsWithTs) + val edge = elementBuilder.newEdge(srcVertex, otherV, label, dir, op = op, version = ts, propsWithTs = propsWithTs) val future = mutateEdges(Seq(edge), withWait = true).flatMap { rets => indexProvider.mutateEdgesAsync(Seq(edge)) @@ -221,6 +347,7 @@ trait S2GraphLike extends Graph { shutdown() } + def compute[C <: GraphComputer](aClass: Class[C]): C = ??? def compute(): GraphComputer = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala index 96e7afe..954bab0 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala @@ -29,7 +29,7 @@ import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality import scala.collection.JavaConverters._ -case class S2Vertex(graph: S2Graph, +case class S2Vertex(graph: S2GraphLike, id: VertexId, ts: Long = System.currentTimeMillis(), props: Props = S2Vertex.EmptyProps, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/S2VertexBuilder.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexBuilder.scala index e0205c5..399b142 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexBuilder.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexBuilder.scala @@ -15,7 +15,7 @@ class S2VertexBuilder(vertex: S2VertexLike) { default } - def copyVertex(graph: S2Graph = vertex.graph, + def copyVertex(graph: S2GraphLike = vertex.graph, id: VertexId = vertex.id, ts: Long = vertex.ts, props: Props = vertex.props, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala index 29db49d..ad35efd 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala @@ -13,7 +13,7 @@ import scala.collection.JavaConverters._ import scala.concurrent.Await trait S2VertexLike extends Vertex with GraphElement { - val graph: S2Graph + val graph: S2GraphLike val id: VertexId val ts: Long val props: Props http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala index 6afbd87..54b865a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala @@ -118,7 +118,7 @@ object RequestParser { } -class RequestParser(graph: S2Graph) { +class RequestParser(graph: S2GraphLike) { import Management.JsonModel._ import RequestParser._ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala index 1a85dba..460c627 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala @@ -54,7 +54,7 @@ object RestHandler { * Public API, only return Future.successful or Future.failed * Don't throw exception */ -class RestHandler(graph: S2Graph)(implicit ec: ExecutionContext) { +class RestHandler(graph: S2GraphLike)(implicit ec: ExecutionContext) { import RestHandler._ val requestParser = new RequestParser(graph) @@ -172,7 +172,7 @@ class RestHandler(graph: S2Graph)(implicit ec: ExecutionContext) { } def getEdgesAsync(jsonQuery: JsValue, impIdOpt: Option[String] = None) - (post: (S2Graph, QueryOption, StepResult) => JsValue): Future[JsValue] = { + (post: (S2GraphLike, QueryOption, StepResult) => JsValue): Future[JsValue] = { def query(obj: JsValue): Future[JsValue] = { (obj \ "queries").asOpt[JsValue] match { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index 2a8f1e2..18e6fa1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -28,7 +28,7 @@ import org.apache.s2graph.core.types._ import scala.concurrent.{ExecutionContext, Future} -abstract class Storage(val graph: S2Graph, +abstract class Storage(val graph: S2GraphLike, val config: Config) { /* Storage backend specific resource management */ val management: StorageManagement http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala index 1b9c94b..2d74a7c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala @@ -26,7 +26,7 @@ import org.apache.s2graph.core.mysqls.LabelMeta import org.apache.s2graph.core.parsers.WhereParser import org.apache.s2graph.core.utils.logger -class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) { +class StorageIO(val graph: S2GraphLike, val serDe: StorageSerDe) { val dummyCursor: Array[Byte] = Array.empty /** Parsing Logic: parse from kv from Storage into Edge */ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala index af0d53d..dcef1cc 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala @@ -28,12 +28,11 @@ import org.apache.s2graph.core.utils.logger import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.Random -class WriteWriteConflictResolver(graph: S2Graph, +class WriteWriteConflictResolver(graph: S2GraphLike, serDe: StorageSerDe, io: StorageIO, mutator: StorageWritable, fetcher: StorageReadable) { - val BackoffTimeout = graph.BackoffTimeout val MaxRetryNum = graph.MaxRetryNum val MaxBackOff = graph.MaxBackOff http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index ef1350a..8b3d862 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -138,7 +138,7 @@ object AsynchbaseStorage { } -class AsynchbaseStorage(override val graph: S2Graph, +class AsynchbaseStorage(override val graph: S2GraphLike, override val config: Config) extends Storage(graph, config) { /** http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala index 5f54e47..af82439 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala @@ -38,7 +38,7 @@ import org.hbase.async._ import scala.collection.JavaConversions._ import scala.concurrent.{ExecutionContext, Future} -class AsynchbaseStorageReadable(val graph: S2Graph, +class AsynchbaseStorageReadable(val graph: S2GraphLike, val config: Config, val client: HBaseClient, val serDe: StorageSerDe, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6403bc9d/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageSerDe.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageSerDe.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageSerDe.scala index bb47e3b..c9a7dd3 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageSerDe.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageSerDe.scala @@ -23,7 +23,7 @@ import org.apache.s2graph.core.storage.serde.Deserializable import org.apache.s2graph.core._ import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe, serde} -class AsynchbaseStorageSerDe(val graph: S2Graph) extends StorageSerDe { +class AsynchbaseStorageSerDe(val graph: S2GraphLike) extends StorageSerDe { import org.apache.s2graph.core.types.HBaseType._ /**
