Repository: incubator-s2graph Updated Branches: refs/heads/master 701f0eeac -> 08d6a3edd
make step works Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/6b088944 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/6b088944 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/6b088944 Branch: refs/heads/master Commit: 6b088944b877b25775ac424658ad9b6472203082 Parents: 701f0ee Author: daewon <[email protected]> Authored: Mon Jul 2 15:28:17 2018 +0900 Committer: daewon <[email protected]> Committed: Mon Jul 2 15:28:17 2018 +0900 ---------------------------------------------------------------------- .gitignore | 1 + s2core/build.sbt | 3 +- .../org/apache/s2graph/core/step/IStep.scala | 58 ++++++++++ .../s2graph/core/step/GraphStepTest.scala | 110 +++++++++++++++++++ .../org/apache/s2graph/core/step/StepTest.scala | 78 +++++++++++++ 5 files changed, 249 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b088944/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index ad5a5e9..0066e18 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ .cache .history .lib/ +*/lib/ var/* dist/* target/ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b088944/s2core/build.sbt ---------------------------------------------------------------------- diff --git a/s2core/build.sbt b/s2core/build.sbt index 229bf41..0368715 100644 --- a/s2core/build.sbt +++ b/s2core/build.sbt @@ -57,7 +57,8 @@ libraryDependencies ++= Seq( "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion excludeLogging(), "org.scala-lang.modules" %% "scala-pickling" % "0.10.1", "net.pishen" %% "annoy4s" % annoy4sVersion, - "org.tensorflow" % "tensorflow" % tensorflowVersion + "org.tensorflow" % "tensorflow" % tensorflowVersion, + "io.reactivex" %% "rxscala" % "0.26.5" ) libraryDependencies := { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b088944/s2core/src/main/scala/org/apache/s2graph/core/step/IStep.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/step/IStep.scala b/s2core/src/main/scala/org/apache/s2graph/core/step/IStep.scala new file mode 100644 index 0000000..e0e23f6 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/step/IStep.scala @@ -0,0 +1,58 @@ +package org.apache.s2graph.core.step + +import org.apache.s2graph.core._ +import rx.lang.scala.Observable + +import scala.language.higherKinds +import scala.language.existentials + +trait RxStep[-A, +B] extends (A => Observable[B]) + +object RxStep { + + case class VertexFetchStep(g: S2GraphLike) extends RxStep[Seq[S2VertexLike], S2VertexLike] { + override def apply(vertices: Seq[S2VertexLike]): Observable[S2VertexLike] = { + Observable.from(vertices) + } + } + + case class EdgeFetchStep(g: S2GraphLike, qp: QueryParam) extends RxStep[S2VertexLike, S2EdgeLike] { + override def apply(v: S2VertexLike): Observable[S2EdgeLike] = { + implicit val ec = g.ec + + val step = org.apache.s2graph.core.Step(Seq(qp)) + val q = Query(Seq(v), steps = Vector(step)) + + val f = g.getEdges(q).map { stepResult => + val edges = stepResult.edgeWithScores.map(_.edge) + Observable.from(edges) + } + + Observable.from(f).flatten + } + } + + private def merge[A, B](steps: RxStep[A, B]*): RxStep[A, B] = new RxStep[A, B] { + override def apply(in: A): Observable[B] = + steps.map(_.apply(in)).toObservable.flatten + } + + def toObservable(q: Query)(implicit graph: S2GraphLike): Observable[S2EdgeLike] = { + val v1: Observable[S2VertexLike] = VertexFetchStep(graph).apply(q.vertices) + + val serialSteps = q.steps.map { step => + val parallelSteps = step.queryParams.map(qp => EdgeFetchStep(graph, qp)) + merge(parallelSteps: _*) + } + + v1.flatMap { v => + val initOpt = serialSteps.headOption.map(_.apply(v)) + + initOpt.map { init => + serialSteps.tail.foldLeft(init) { case (prev, next) => + prev.map(_.tgtForVertex).flatMap(next) + } + }.getOrElse(Observable.empty) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b088944/s2core/src/test/scala/org/apache/s2graph/core/step/GraphStepTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/step/GraphStepTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/step/GraphStepTest.scala new file mode 100644 index 0000000..96e49a0 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/step/GraphStepTest.scala @@ -0,0 +1,110 @@ +package org.apache.s2graph.core.step + +import org.apache.s2graph.core.Integrate.IntegrateCommon +import org.apache.s2graph.core._ +import org.apache.s2graph.core.parsers.Where +import org.apache.s2graph.core.rest.RequestParser +import play.api.libs.json.Json +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} + +class GraphStepTest extends IntegrateCommon { + + import TestUtil._ + import RxStep._ + + val insert = "insert" + val e = "e" + val weight = "weight" + val is_hidden = "is_hidden" + + override def initTestData(): Unit = { + super.initTestData() + + insertEdgesSync( + toEdge(1000, insert, e, 1, 10, testLabelName), + toEdge(1000, insert, e, 2, 20, testLabelName), + toEdge(1000, insert, e, 3, 30, testLabelName), + + toEdge(1000, insert, e, 100, 1, testLabelName, Json.obj(weight -> 30)) + ) + } + + test("basic compose") { + val vertices = Seq( + graph.toVertex(testServiceName, testColumnName, 1), + graph.toVertex(testServiceName, testColumnName, 2), + graph.toVertex(testServiceName, testColumnName, 3), + + graph.toVertex(testServiceName, testColumnName, 10) + ) + + val v1 = VertexFetchStep(graph) + + val qpIn = QueryParam(labelName = testLabelName, direction = "in") + val qpOut = QueryParam(labelName = testLabelName, direction = "out") + + val e1 = EdgeFetchStep(graph, qpIn) + val e2 = EdgeFetchStep(graph, qpOut) + + val where = Where("_to = 20").get + + val q = + v1.apply(vertices) // vertices: 4 - (1, 2, 3, 10) + .flatMap(e1) // edges: 4 - (srcId = 1, 2, 3 and tgtId = 10) + .filter(where.filter) // filterOut (only _to == 20) + .map(_.tgtForVertex) // vertices: (20) + .flatMap(v => e1.apply(v) ++ e2.apply(v)) // edges: (tgtId = 20) + + val res = q.toBlocking.toList + } + + test("Query to RxSteps") { + def q(id: Int) = Json.parse( + s""" + { + "srcVertices": [ + { "serviceName": "$testServiceName", + "columnName": "$testColumnName", + "id": $id + }], + "steps": [ + [{ + "label": "$testLabelName", + "direction": "out", + "offset": 0, + "limit": 10 + }, + { + "label": "$testLabelName", + "direction": "in", + "offset": 0, + "limit": 10 + }], + + [{ + "label": "$testLabelName", + "direction": "out", + "offset": 0, + "limit": 10, + "where": "weight > 10" + }, + { + "label": "$testLabelName", + "direction": "in", + "offset": 0, + "limit": 10 + }] + ] + }""") + + val queryJs = q(1) + val requestParser = new RequestParser(graph) + val query = requestParser.toQuery(queryJs, None) + + val actual = RxStep.toObservable(query)(graph).toBlocking.toList.sortBy(_.srcVertex.innerIdVal.toString) + val expected = Await.result(graph.getEdges(query), Duration("30 sec")).edgeWithScores.map(_.edge).sortBy(_.srcVertex.innerIdVal.toString) + + actual shouldBe expected + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b088944/s2core/src/test/scala/org/apache/s2graph/core/step/StepTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/step/StepTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/step/StepTest.scala new file mode 100644 index 0000000..f29a346 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/step/StepTest.scala @@ -0,0 +1,78 @@ +package org.apache.s2graph.core.step + +import org.scalatest.{BeforeAndAfterEach, FunSuite, Matchers} +import rx.lang.scala.{Observable, Subscription} + +class StepTest extends FunSuite with Matchers { + + trait GraphE { + def id: String + } + + case class V(id: String) extends GraphE + + case class E(id: String, src: V, tgt: V) extends GraphE + + object GraphModels { + /** + * vertices: [A, B] + * edges: [E(A, B), E(B, A)] + */ + val va = V("V_A") + val vb = V("V_B") + + val e1 = E("E1", va, vb) + val e2 = E("E2", vb, va) + + val allVertices = List(va, vb) + val allEdges = List(e1, e2) + } + + case class VertexStep(vid: String) extends RxStep[Unit, V] { + override def apply(in: Unit): Observable[V] = { + val vertices = GraphModels.allVertices.filter(v => vid == v.id) + Observable.from(vertices) + } + } + + case class EdgeStep(dir: String) extends RxStep[V, E] { + override def apply(in: V): Observable[E] = { + val edges = if (dir == "OUT") { + GraphModels.allEdges.filter(e => in == e.src) + } else { + GraphModels.allEdges.filter(e => in == e.tgt) + } + + Observable.from(edges) + } + } + + case class EdgeToVertexStep() extends RxStep[E, V] { + override def apply(in: E): Observable[V] = { + Observable.just(in.tgt) + } + } + + test("basic step") { + val v1: RxStep[Unit, V] = VertexStep("V_A") + + val e1: RxStep[V, E] = EdgeStep("OUT") + val e2 = EdgeStep("IN") + + val g = v1(()) + .flatMap(v => e1(v) ++ e2(v)) + .flatMap(EdgeToVertexStep()) + .flatMap(v => e1(v) ++ e2(v)) + .distinct + + val expected = List( + E("E1", V("V_A"), V("V_B")), + E("E2", V("V_B"), V("V_A")) + ).sortBy(_.id) + + val actual = g.toBlocking.toList.sortBy(_.id) + + println(actual) + actual shouldBe expected + } +}
