move constants from IndexProvider to GlobalIndex.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/cc713578 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/cc713578 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/cc713578 Branch: refs/heads/master Commit: cc71357825dc5b9cafc765b922fe3083fb6e2119 Parents: 30bf575 3725464 Author: DO YUNG YOON <[email protected]> Authored: Sat Jul 29 08:04:01 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Sat Jul 29 08:04:01 2017 +0900 ---------------------------------------------------------------------- .travis/install-hbase.sh | 2 +- .../scala/org/apache/s2graph/core/Management.scala | 2 +- .../apache/s2graph/core/index/IndexProvider.scala | 9 ++------- .../apache/s2graph/core/mysqls/GlobalIndex.scala | 16 ++++++++++++++-- .../s2graph/core/tinkerpop/S2GraphProvider.scala | 6 +----- 5 files changed, 19 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cc713578/.travis/install-hbase.sh ---------------------------------------------------------------------- diff --cc .travis/install-hbase.sh index f55437c,f55437c..f2ba5d3 --- a/.travis/install-hbase.sh +++ b/.travis/install-hbase.sh @@@ -17,5 -17,5 +17,5 @@@ set -xe if [ ! -d "$HOME/hbase-$HBASE_VERSION/bin" ]; then -- cd $HOME && wget -q -O - http://mirror.navercorp.com/apache/hbase/stable/hbase-$HBASE_VERSION-bin.tar.gz | tar xz ++ cd $HOME && wget -q -O - http://mirror.navercorp.com/apache/hbase/stable/hbase-*-bin.tar.gz | tar xz fi http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cc713578/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cc713578/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala ---------------------------------------------------------------------- diff --cc s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala index 7f2602f,0000000..57f384c mode 100644,000000..100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala @@@ -1,325 -1,0 +1,320 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.index + +import java.util + +import com.typesafe.config.Config +import org.apache.lucene.analysis.standard.StandardAnalyzer +import org.apache.lucene.document._ +import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig} +import org.apache.lucene.queryparser.classic.{ParseException, QueryParser} +import org.apache.lucene.search.IndexSearcher +import org.apache.lucene.store.{BaseDirectory, RAMDirectory} +import org.apache.s2graph.core.io.Conversions +import org.apache.s2graph.core.{EdgeId, S2Edge, S2Vertex} +import org.apache.s2graph.core.mysqls._ +import org.apache.s2graph.core.types.{InnerValLike, VertexId} +import org.apache.s2graph.core.utils.logger +import org.apache.tinkerpop.gremlin.process.traversal.{Compare, Contains, P} +import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer +import org.apache.tinkerpop.gremlin.process.traversal.util.{AndP, OrP} +import org.apache.tinkerpop.gremlin.structure.T +import play.api.libs.json.Json + +import scala.concurrent.Future + +object IndexProvider { - val vidField = "_vid_" - val eidField = "_eid_" - val labelField = "_label_" - val serviceField = "_service_" - val serviceColumnField = "_serviceColumn_" - - val hiddenIndexFields = Set(vidField, eidField, labelField, serviceField, serviceColumnField) ++ import GlobalIndex._ + val hitsPerPage = 100000 + + def apply(config: Config): IndexProvider = { + val indexProviderType = "lucene" +// if (config.hasPath("index.provider")) config.getString("index.provider") else "lucene" + + indexProviderType match { + case "lucene" => new LuceneIndexProvider(config) + } + } + + def buildQuerySingleString(container: HasContainer): String = { + import scala.collection.JavaConversions._ + + val key = if (container.getKey == T.label.getAccessor) labelField else container.getKey + val value = container.getValue + + container.getPredicate match { + case and: AndP[_] => + val buffer = scala.collection.mutable.ArrayBuffer.empty[String] + and.getPredicates.foreach { p => + buffer.append(buildQuerySingleString(new HasContainer(container.getKey, p))) + } + buffer.mkString("(", " AND ", ")") + case or: OrP[_] => + val buffer = scala.collection.mutable.ArrayBuffer.empty[String] + or.getPredicates.foreach { p => + buffer.append(buildQuerySingleString(new HasContainer(container.getKey, p))) + } + buffer.mkString("(", " OR ", ")") + case _ => + val biPredicate = container.getBiPredicate + biPredicate match { + + case Contains.within => + key + ":(" + value.asInstanceOf[util.Collection[_]].toSeq.mkString(" OR ") + ")" + case Contains.without => + "NOT " + key + ":(" + value.asInstanceOf[util.Collection[_]].toSeq.mkString(" AND ") + ")" + case Compare.eq => s"${key}:${value}" + case Compare.gt => s"(${key}:[${value} TO *] AND NOT ${key}:${value})" + case Compare.gte => s"${key}:[${value} TO *]" + case Compare.lt => s"${key}:[* TO ${value}]" + case Compare.lte => s"(${key}:[* TO ${value}] OR ${key}:${value})" + case Compare.neq => s"NOT ${key}:${value}" + case _ => throw new IllegalArgumentException("not supported yet.") + } + } + } + + def buildQueryString(hasContainers: java.util.List[HasContainer]): String = { + import scala.collection.JavaConversions._ + val builder = scala.collection.mutable.ArrayBuffer.empty[String] + + hasContainers.foreach { container => + container.getPredicate match { + case and: AndP[_] => + val buffer = scala.collection.mutable.ArrayBuffer.empty[String] + and.getPredicates.foreach { p => + buffer.append(buildQuerySingleString(new HasContainer(container.getKey, p))) + } + builder.append(buffer.mkString("(", " AND ", ")")) + case or: OrP[_] => + val buffer = scala.collection.mutable.ArrayBuffer.empty[String] + or.getPredicates.foreach { p => + buffer.append(buildQuerySingleString(new HasContainer(container.getKey, p))) + } + builder.append(buffer.mkString("(", " OR ", ")")) + case _ => + builder.append(buildQuerySingleString(container)) + } + } + + builder.mkString(" AND ") + } +} + +trait IndexProvider { + //TODO: Seq nee do be changed into stream + def fetchEdgeIds(hasContainers: java.util.List[HasContainer]): java.util.List[EdgeId] + def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): Future[java.util.List[EdgeId]] + + def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId] + def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[java.util.List[VertexId]] + + def mutateVertices(vertices: Seq[S2Vertex]): Seq[Boolean] + def mutateVerticesAsync(vertices: Seq[S2Vertex]): Future[Seq[Boolean]] + + def mutateEdges(edges: Seq[S2Edge]): Seq[Boolean] + def mutateEdgesAsync(edges: Seq[S2Edge]): Future[Seq[Boolean]] + + def shutdown(): Unit +} + +class LuceneIndexProvider(config: Config) extends IndexProvider { + import IndexProvider._ + import scala.collection.mutable + import scala.collection.JavaConverters._ ++ import GlobalIndex._ + + val analyzer = new StandardAnalyzer() + val writers = mutable.Map.empty[String, IndexWriter] + val directories = mutable.Map.empty[String, BaseDirectory] + + private def getOrElseCreateIndexWriter(indexName: String): IndexWriter = { + writers.getOrElseUpdate(indexName, { + val dir = directories.getOrElseUpdate(indexName, new RAMDirectory()) + val indexConfig = new IndexWriterConfig(analyzer) + new IndexWriter(dir, indexConfig) + }) + } + + private def toDocument(globalIndex: GlobalIndex, vertex: S2Vertex): Option[Document] = { + val props = vertex.props.asScala + val exist = props.exists(t => globalIndex.propNamesSet(t._1)) + if (!exist) None + else { + val doc = new Document() + val id = vertex.id.toString + + doc.add(new StringField(vidField, id, Field.Store.YES)) + doc.add(new StringField(serviceField, vertex.serviceName, Field.Store.YES)) + doc.add(new StringField(serviceColumnField, vertex.columnName, Field.Store.YES)) + + props.foreach { case (dim, s2VertexProperty) => + val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES else Field.Store.NO + val field = s2VertexProperty.columnMeta.dataType match { + case "string" => new StringField(dim, s2VertexProperty.innerVal.value.toString, shouldIndex) + case _ => new StringField(dim, s2VertexProperty.innerVal.value.toString, shouldIndex) + } + doc.add(field) + } + + Option(doc) + } + } + + private def toDocument(globalIndex: GlobalIndex, edge: S2Edge): Option[Document] = { + val props = edge.propsWithTs.asScala + val exist = props.exists(t => globalIndex.propNamesSet(t._1)) + if (!exist) None + else { + val doc = new Document() + val id = edge.edgeId.toString + + doc.add(new StringField(eidField, id, Field.Store.YES)) + doc.add(new StringField(serviceField, edge.serviceName, Field.Store.YES)) + doc.add(new StringField(labelField, edge.label(), Field.Store.YES)) + + props.foreach { case (dim, s2Property) => + val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES else Field.Store.NO + val field = s2Property.labelMeta.dataType match { + case "string" => new StringField(dim, s2Property.innerVal.value.toString, shouldIndex) + case _ => new StringField(dim, s2Property.innerVal.value.toString, shouldIndex) + } + doc.add(field) + } + + Option(doc) + } + } + + override def mutateVertices(vertices: Seq[S2Vertex]): Seq[Boolean] = { + val globalIndexOptions = GlobalIndex.findAll() + + globalIndexOptions.map { globalIndex => + val writer = getOrElseCreateIndexWriter(globalIndex.indexName) + + vertices.foreach { vertex => + toDocument(globalIndex, vertex).foreach { doc => + writer.addDocument(doc) + } + } + + writer.commit() + } + + vertices.map(_ => true) + } + + override def mutateEdges(edges: Seq[S2Edge]): Seq[Boolean] = { + val globalIndexOptions = GlobalIndex.findAll() + + globalIndexOptions.map { globalIndex => + val writer = getOrElseCreateIndexWriter(globalIndex.indexName) + + edges.foreach { edge => + toDocument(globalIndex, edge).foreach { doc => + writer.addDocument(doc) + } + } + + writer.commit() + } + + edges.map(_ => true) + } + + override def fetchEdgeIds(hasContainers: java.util.List[HasContainer]): java.util.List[EdgeId] = { + val field = eidField + val ids = new java.util.HashSet[EdgeId] + + GlobalIndex.findGlobalIndex(hasContainers).map { globalIndex => + val queryString = buildQueryString(hasContainers) + + try { + val q = new QueryParser(field, analyzer).parse(queryString) + + val reader = DirectoryReader.open(directories(globalIndex.indexName)) + val searcher = new IndexSearcher(reader) + + val docs = searcher.search(q, hitsPerPage) + + docs.scoreDocs.foreach { scoreDoc => + val document = searcher.doc(scoreDoc.doc) + val id = Conversions.s2EdgeIdReads.reads(Json.parse(document.get(field))).get + ids.add(id); + } + + reader.close() + ids + } catch { + case ex: ParseException => + logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex) + ids + } + } + + new util.ArrayList[EdgeId](ids) + } + + override def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId] = { + val field = vidField + val ids = new java.util.HashSet[VertexId] + + GlobalIndex.findGlobalIndex(hasContainers).map { globalIndex => + val queryString = buildQueryString(hasContainers) + + try { + val q = new QueryParser(field, analyzer).parse(queryString) + + val reader = DirectoryReader.open(directories(globalIndex.indexName)) + val searcher = new IndexSearcher(reader) + + val docs = searcher.search(q, hitsPerPage) + + docs.scoreDocs.foreach { scoreDoc => + val document = searcher.doc(scoreDoc.doc) + val id = Conversions.s2VertexIdReads.reads(Json.parse(document.get(field))).get + ids.add(id) + } + + reader.close() + ids + } catch { + case ex: ParseException => + logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex) + ids + } + } + + new util.ArrayList[VertexId](ids) + } + + override def shutdown(): Unit = { + writers.foreach { case (_, writer) => writer.close() } + } + + override def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[EdgeId]] = Future.successful(fetchEdgeIds(hasContainers)) + + override def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[VertexId]] = Future.successful(fetchVertexIds(hasContainers)) + + override def mutateVerticesAsync(vertices: Seq[S2Vertex]): Future[Seq[Boolean]] = Future.successful(mutateVertices(vertices)) + + override def mutateEdgesAsync(edges: Seq[S2Edge]): Future[Seq[Boolean]] = Future.successful(mutateEdges(edges)) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cc713578/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala ---------------------------------------------------------------------- diff --cc s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala index bb18949,1d1dfe2..347c083 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala @@@ -6,9 -6,9 +6,15 @@@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at ++<<<<<<< HEAD + * + * http://www.apache.org/licenses/LICENSE-2.0 + * ++======= + * + * http://www.apache.org/licenses/LICENSE-2.0 + * ++>>>>>>> S2GRAPH-152 * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cc713578/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala ---------------------------------------------------------------------- diff --cc s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala index 3157e18,87eac0e..d8367d7 --- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala @@@ -19,9 -19,9 +19,6 @@@ package org.apache.s2graph.core.tinkerpop --import java.util -- --import com.typesafe.config.ConfigFactory import org.apache.commons.configuration.Configuration import org.apache.s2graph.core.GraphExceptions.LabelNotExistException import org.apache.s2graph.core.Management.JsonModel.Prop @@@ -34,7 -33,7 +30,7 @@@ import org.apache.s2graph.core.utils.lo import org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData import org.apache.tinkerpop.gremlin.structure.{Element, Graph, T} import org.apache.tinkerpop.gremlin.{AbstractGraphProvider, LoadGraphWith} -- ++import java.util import scala.collection.JavaConverters._ object S2GraphProvider {
