add IndexProvider.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/7ddc2762 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/7ddc2762 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/7ddc2762 Branch: refs/heads/master Commit: 7ddc276207a2ef8276d3f40067b64ac06876c48d Parents: e3472de Author: DO YUNG YOON <[email protected]> Authored: Tue Jul 11 18:48:53 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Tue Jul 11 18:48:53 2017 +0900 ---------------------------------------------------------------------- .../core/io/tinkerpop/optimize/S2GraphStep.java | 59 +++++++++++++++++++- .../tinkerpop/optimize/S2GraphStepStrategy.java | 2 + .../org/apache/s2graph/core/QueryParam.scala | 4 +- .../s2graph/core/index/IndexProvider.scala | 27 +++++---- .../s2graph/core/index/IndexProviderTest.scala | 2 +- .../core/tinkerpop/structure/S2GraphTest.scala | 5 +- 6 files changed, 82 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7ddc2762/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java ---------------------------------------------------------------------- diff --git a/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java b/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java index 4d6568b..384a88b 100644 --- a/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java +++ b/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java @@ -1,27 +1,74 @@ package org.apache.s2graph.core.io.tinkerpop.optimize; +import org.apache.s2graph.core.EdgeId; +import org.apache.s2graph.core.QueryParam; +import org.apache.s2graph.core.S2Graph; +import org.apache.s2graph.core.index.IndexProvider; +import org.apache.s2graph.core.index.IndexProvider$; +import org.apache.s2graph.core.mysqls.Label; import org.apache.s2graph.core.utils.logger; import org.apache.tinkerpop.gremlin.process.traversal.Order; +import org.apache.tinkerpop.gremlin.process.traversal.Step; import org.apache.tinkerpop.gremlin.process.traversal.step.HasContainerHolder; import org.apache.tinkerpop.gremlin.process.traversal.step.Profiling; +import org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.NoOpBarrierStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.IdentityStep; import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer; import org.apache.tinkerpop.gremlin.process.traversal.util.MutableMetrics; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; import org.apache.tinkerpop.gremlin.structure.Element; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import java.util.*; +import java.util.function.BiConsumer; +import java.util.function.Consumer; public class S2GraphStep<S, E extends Element> extends GraphStep<S, E> { private final List<HasContainer> hasContainers = new ArrayList<>(); + private void foldInHasContainers(final GraphStep<?, ?> originalStep) { + Step<?, ?> currentStep = originalStep.getNextStep(); + while (true) { + if (currentStep instanceof HasStep) { + hasContainers.addAll(((HasStep)currentStep).getHasContainers()); + } else if (currentStep instanceof IdentityStep) { + } else if (currentStep instanceof NoOpBarrierStep) { + + } else { + break; + } + + currentStep = currentStep.getNextStep(); + } + + } public S2GraphStep(final GraphStep<S, E> originalStep) { super(originalStep.getTraversal(), originalStep.getReturnClass(), originalStep.isStartStep(), originalStep.getIds()); + + foldInHasContainers(originalStep); originalStep.getLabels().forEach(this::addLabel); - System.err.println("[[S2GraphStep]]"); + // 1. build S2Graph QueryParams for this step. + // 2. graph.vertices(this.ids, queryParams) or graph.edges(this.ids, queryParams) + // 3. vertices/edges lookup indexProvider, then return Seq[EdgeId/VertexId] + + this.setIteratorSupplier(() -> { + final S2Graph graph = (S2Graph)traversal.asAdmin().getGraph().get(); + if (this.ids != null && this.ids.length > 0) { + + return iteratorList((Iterator)graph.vertices(this.ids)); + } + // full scan + + String queryString = IndexProvider$.MODULE$.buildQueryString(hasContainers); + + List<String> ids = graph.indexProvider().fetchIds(queryString); + return (Iterator) (Vertex.class.isAssignableFrom(this.returnClass) ? graph.vertices(ids) : graph.edges(ids)); + }); } @Override @@ -30,4 +77,14 @@ public class S2GraphStep<S, E extends Element> extends GraphStep<S, E> { super.toString() : StringFactory.stepString(this, Arrays.toString(this.ids), this.hasContainers); } + private <E extends Element> Iterator<E> iteratorList(final Iterator<E> iterator) { + final List<E> list = new ArrayList<E>(); + while (iterator.hasNext()) { + final E e = iterator.next(); + if (HasContainer.testAll(e, this.hasContainers)) + list.add(e); + } + return list.iterator(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7ddc2762/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStepStrategy.java ---------------------------------------------------------------------- diff --git a/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStepStrategy.java b/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStepStrategy.java index ea9ad7e..15d7699 100644 --- a/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStepStrategy.java +++ b/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStepStrategy.java @@ -26,6 +26,8 @@ public class S2GraphStepStrategy extends AbstractTraversalStrategy<TraversalStra final S2GraphStep<?, ?> s2GraphStep = new S2GraphStep<>(originalGraphStep); TraversalHelper.replaceStep(originalGraphStep, (Step) s2GraphStep, traversal); + + } else { //Make sure that any provided "start" elements are instantiated in the current transaction // Object[] ids = originalGraphStep.getIds(); http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7ddc2762/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala index 5b8543a..7e95f58 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala @@ -28,8 +28,9 @@ import org.apache.s2graph.core.parsers.{Where, WhereParser} import org.apache.s2graph.core.rest.TemplateHelper import org.apache.s2graph.core.storage.StorageSerializable._ import org.apache.s2graph.core.types._ +import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer import org.hbase.async.ColumnRangeFilter -import play.api.libs.json.{JsString, JsNull, JsValue, Json} +import play.api.libs.json.{JsNull, JsString, JsValue, Json} import scala.util.{Success, Try} @@ -257,6 +258,7 @@ object QueryParam { val Delimiter = "," val maxMetaByte = (-1).toByte val fillArray = Array.fill(100)(maxMetaByte) + import scala.collection.JavaConverters._ def apply(labelWithDirection: LabelWithDirection): QueryParam = { val label = Label.findById(labelWithDirection.labelId) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7ddc2762/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala index baf05d4..d3878af 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala @@ -11,6 +11,7 @@ import org.apache.s2graph.core.io.Conversions import org.apache.s2graph.core.{EdgeId, S2Edge} import org.apache.s2graph.core.mysqls._ import org.apache.s2graph.core.types.InnerValLike +import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer import play.api.libs.json.Json object IndexProvider { @@ -23,11 +24,19 @@ object IndexProvider { case "lucene" => new LuceneIndexProvider(config) } } + + def buildQueryString(hasContainers: java.util.List[HasContainer]): String = { + import scala.collection.JavaConversions._ + hasContainers.map { container => + container.getKey + ":" + container.getValue + }.mkString(" AND ") + } + } trait IndexProvider { //TODO: Seq nee do be changed into stream - def fetchEdges(indexProps: Seq[(String, InnerValLike)]): Seq[EdgeId] + def fetchIds(queryString: String): java.util.List[String] def mutateEdges(edges: Seq[S2Edge]): Seq[Boolean] @@ -57,24 +66,22 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { edges.map(_ => true) } - override def fetchEdges(indexProps: Seq[(String, InnerValLike)]): Seq[EdgeId] = { - val queryStr = indexProps.map { case (name, value) => - name + ": " + value.toString() - }.mkString(" AND ") - - val q = new QueryParser(edgeIdField, analyzer).parse(queryStr) + override def fetchIds(queryString: String): java.util.List[String] = { + val ids = new java.util.ArrayList[String] + val q = new QueryParser(edgeIdField, analyzer).parse(queryString) val hitsPerPage = 10 val reader = DirectoryReader.open(directory) val searcher = new IndexSearcher(reader) val docs = searcher.search(q, hitsPerPage) - val ls = docs.scoreDocs.map { scoreDoc => + + docs.scoreDocs.foreach { scoreDoc => val document = searcher.doc(scoreDoc.doc) - Conversions.s2EdgeIdReads.reads(Json.parse(document.get(edgeIdField))).get + ids.add(document.get(edgeIdField)) } reader.close() - ls + ids } override def shutdown(): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7ddc2762/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala index 8f484ad..70f7c3c 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala @@ -25,7 +25,7 @@ class IndexProviderTest extends IntegrateCommon { edges.foreach(e => logger.debug(s"[Edge]: $e")) indexProvider.mutateEdges(edges) - val edgeIds = indexProvider.fetchEdges(Seq("time" -> InnerVal.withLong(10, "v4"))) + val edgeIds = indexProvider.fetchIds("time: 10") edgeIds.foreach { edgeId => logger.debug(s"[EdgeId]: $edgeId") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7ddc2762/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala index d9aa3bc..1ef7890 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala @@ -463,10 +463,7 @@ class S2GraphTest extends FunSuite with Matchers with TestCommonWithModels { val e12 = v6.addEdge("created", v3, "weight", Double.box(0.2)) - val ls = graph.traversal().V().choose(new Predicate[Vertex] { - override def test(t: Vertex): Boolean = - t.label().equals("person") - }, out("knows"), in("created")).values("name").asAdmin() + val ls = graph.traversal().V().has("name", "josh") val l = ls.toList logger.error(s"[Size]: ${l.size}")
