[S2GRAPH-121]: Create `Result` class to hold traverse result edges. JIRA: [S2GRAPH-121] https://issues.apache.org/jira/browse/S2GRAPH-121
Pull Request: Closes #96 Authors DO YUNG YOON: [email protected] Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/8dbb9a3e Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/8dbb9a3e Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/8dbb9a3e Branch: refs/heads/master Commit: 8dbb9a3eeff6f412fe5defee2baffee32f174981 Parents: b590831 Author: DO YUNG YOON <[email protected]> Authored: Wed Nov 16 16:56:41 2016 +0100 Committer: DO YUNG YOON <[email protected]> Committed: Wed Nov 16 17:01:07 2016 +0100 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/s2graph/core/mysqls/schema.sql | 1 + .../scala/org/apache/s2graph/core/Edge.scala | 82 ++- .../apache/s2graph/core/ExceptionHandler.scala | 19 +- .../scala/org/apache/s2graph/core/Graph.scala | 232 ++++--- .../apache/s2graph/core/GraphExceptions.scala | 2 + .../org/apache/s2graph/core/Management.scala | 69 +-- .../org/apache/s2graph/core/OrderingUtil.scala | 7 + .../org/apache/s2graph/core/PostProcess.scala | 601 ++++--------------- .../org/apache/s2graph/core/QueryParam.scala | 75 ++- .../org/apache/s2graph/core/QueryResult.scala | 234 +++++++- .../scala/org/apache/s2graph/core/Vertex.scala | 74 ++- .../apache/s2graph/core/mysqls/Experiment.scala | 3 +- .../org/apache/s2graph/core/mysqls/Label.scala | 59 +- .../s2graph/core/mysqls/ServiceColumn.scala | 31 +- .../s2graph/core/parsers/WhereParser.scala | 2 +- .../s2graph/core/rest/RequestParser.scala | 177 ++++-- .../apache/s2graph/core/rest/RestHandler.scala | 182 +++--- .../apache/s2graph/core/storage/Storage.scala | 336 ++++++----- .../core/storage/hbase/AsynchbaseStorage.scala | 147 +++-- .../indexedge/wide/IndexEdgeSerializable.scala | 2 +- .../org/apache/s2graph/core/utils/Logger.scala | 6 + .../org/apache/s2graph/core/EdgeTest.scala | 5 +- .../s2graph/core/Integrate/CrudTest.scala | 163 +++-- .../core/Integrate/IntegrateCommon.scala | 10 +- .../s2graph/core/Integrate/QueryTest.scala | 159 ++--- .../core/Integrate/StrongLabelDeleteTest.scala | 14 +- .../core/Integrate/VertexTestHelper.scala | 7 +- .../core/Integrate/WeakLabelDeleteTest.scala | 9 +- .../apache/s2graph/core/JsonParserTest.scala | 3 +- .../apache/s2graph/core/QueryParamTest.scala | 56 +- .../s2graph/core/TestCommonWithModels.scala | 2 +- .../core/benchmark/JsonBenchmarkSpec.scala | 62 +- .../core/benchmark/SamplingBenchmarkSpec.scala | 207 +++---- .../org/apache/s2graph/rest/netty/Server.scala | 96 ++- .../apache/s2graph/rest/play/Bootstrap.scala | 4 +- .../s2graph/rest/play/config/Config.scala | 4 + .../controllers/ApplicationController.scala | 6 +- .../play/controllers/CounterController.scala | 21 +- .../rest/play/controllers/EdgeController.scala | 164 +++-- .../play/controllers/ExperimentController.scala | 5 +- .../play/controllers/PublishController.scala | 8 - .../rest/play/controllers/QueryController.scala | 21 +- .../play/controllers/VertexController.scala | 18 +- s2rest_play/conf/reference.conf | 1 + s2rest_play/conf/routes | 7 +- 46 files changed, 1892 insertions(+), 1503 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 0c8edc3..73b3597 100644 --- a/CHANGES +++ b/CHANGES @@ -94,6 +94,8 @@ Release 0.1.0 - unreleased S2GRAPH-127: Refactor ExceptionHander Object into Class (Committed by DOYUNG YOON). + S2GRAPH-121: Create `Result` class to hold traverse result edges (Committed by DOYUNG YOON). + BUG FIXES S2GRAPH-18: Query Option "interval" is Broken. http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql ---------------------------------------------------------------------- diff --git a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql index d8ee5bc..822df6c 100644 --- a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql +++ b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql @@ -96,6 +96,7 @@ CREATE TABLE `labels` ( `is_async` tinyint(4) NOT NULL default '0', `compressionAlgorithm` varchar(64) NOT NULL DEFAULT 'lz4', `options` text, + `deleted_at` datetime DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_label` (`label`), INDEX `idx_labels_src_column_name` (`src_column_name`), http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala index 4d4afe0..97730f3 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala @@ -19,10 +19,11 @@ package org.apache.s2graph.core +import org.apache.s2graph.core.GraphExceptions.LabelNotExistException +import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} import org.apache.s2graph.core.types._ import org.apache.s2graph.core.utils.logger -import org.apache.s2graph.core.JSONParser._ import play.api.libs.json.{JsNumber, Json} import scala.collection.JavaConversions._ import scala.util.hashing.MurmurHash3 @@ -112,7 +113,7 @@ case class IndexEdge(srcVertex: Vertex, lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet lazy val metas = for ((k, v) <- props if !ordersKeyMap.contains(k)) yield k -> v.innerVal - // lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) } +// lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) } //TODO: // lazy val kvs = Graph.client.indexedEdgeSerializer(this).toKeyValues.toList @@ -151,8 +152,23 @@ case class Edge(srcVertex: Vertex, val schemaVer = label.schemaVersion val ts = propsWithTs(LabelMeta.timeStampSeq).innerVal.toString.toLong + lazy val srcId = srcVertex.innerIdVal + lazy val tgtId = tgtVertex.innerIdVal + lazy val labelName = label.label + lazy val direction = GraphUtil.fromDirection(labelWithDir.dir) + lazy val properties = toProps() + def props = propsWithTs.mapValues(_.innerVal) + + private def toProps(): Map[String, Any] = { + for { + (labelMeta, defaultVal) <- label.metaPropsDefaultMapInner + } yield { + labelMeta.name -> propsWithTs.getOrElse(labelMeta.seq, defaultVal).innerVal.value + } + } + def relatedEdges = { if (labelWithDir.isDirected) { val skipReverse = label.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false) @@ -204,10 +220,10 @@ case class Edge(srcVertex: Vertex, def isDegree = propsWithTs.contains(LabelMeta.degreeSeq) - // def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match { - // case Some(_) => props - // case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer)) - // } +// def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match { +// case Some(_) => props +// case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer)) +// } def propsPlusTsValid = propsWithTs.filter(kv => kv._1 >= 0) @@ -290,8 +306,31 @@ case class Edge(srcVertex: Vertex, ret.mkString("\t") } + + def selectValues(selectColumns: Seq[String], + useToString: Boolean = true, + score: Double = 0.0): Seq[Option[Any]] = { + //TODO: Option should be matched in JsonParser anyTo* + for { + selectColumn <- selectColumns + } yield { + val valueOpt = selectColumn match { + case LabelMeta.from.name | "from" => Option(srcId) + case LabelMeta.to.name | "to" => Option(tgtId) + case "label" => Option(labelName) + case "direction" => Option(direction) + case "score" => Option(score) + case LabelMeta.timestamp.name | "timestamp" => Option(ts) + case _ => + properties.get(selectColumn) + } + if (useToString) valueOpt.map(_.toString) + else valueOpt + } + } } + case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge], edgesToInsert: List[IndexEdge] = List.empty[IndexEdge], newSnapshotEdge: Option[SnapshotEdge] = None) { @@ -310,6 +349,33 @@ object Edge { val incrementVersion = 1L val minTsVal = 0L + def toEdge(srcId: Any, + tgtId: Any, + labelName: String, + direction: String, + props: Map[String, Any] = Map.empty, + ts: Long = System.currentTimeMillis(), + operation: String = "insert"): Edge = { + val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) + + val srcVertexId = toInnerVal(srcId.toString, label.srcColumn.columnType, label.schemaVersion) + val tgtVertexId = toInnerVal(tgtId.toString, label.tgtColumn.columnType, label.schemaVersion) + + val srcColId = label.srcColumn.id.get + val tgtColId = label.tgtColumn.id.get + + val srcVertex = Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis()) + val tgtVertex = Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis()) + val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported.")) + + val labelWithDir = LabelWithDirection(label.id.get, dir) + val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts) + val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts) + val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) + + new Edge(srcVertex, tgtVertex, labelWithDir, op = op, version = ts, propsWithTs = propsWithTs) + } + /** now version information is required also **/ type State = Map[Byte, InnerValLikeWithTs] type PropsPairWithTs = (State, State, Long, String) @@ -387,7 +453,7 @@ object Edge { // logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}") // logger.error(s"$propsWithTs") - (requestEdge, edgeMutate) + (requestEdge.copy(propsWithTs = propsWithTs), edgeMutate) } } @@ -582,7 +648,7 @@ object Edge { (propsWithTs, true) } - def fromString(s: String): Option[Edge] = Graph.toEdge(s) +// def fromString(s: String): Option[Edge] = Graph.toEdge(s) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala index d03d483..48976d3 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala @@ -24,8 +24,11 @@ import java.util.Properties import com.typesafe.config.Config import org.apache.kafka.clients.producer._ import org.apache.s2graph.core.utils.logger +import play.api.libs.json.JsValue class ExceptionHandler(config: Config) { + + import ExceptionHandler._ val keyBrokerList = "kafka.metadata.broker.list" @@ -43,7 +46,6 @@ class ExceptionHandler(config: Config) { } } else None - def enqueue(m: KafkaMessage): Unit = { producer match { case None => logger.debug(s"skip log to Kafka: ${m}") @@ -68,24 +70,33 @@ object ExceptionHandler { type Key = String type Val = String - def toKafkaMessage(topic: String, element: GraphElement, originalString: Option[String] = None) = { + def toKafkaMessage(topic: String, + element: GraphElement, + originalString: Option[String] = None, + produceJson: Boolean = false) = { + val edgeString = originalString.getOrElse(element.toLogString()) + val msg = edgeString + KafkaMessage( new ProducerRecord[Key, Val]( topic, element.queuePartitionKey, - originalString.getOrElse(element.toLogString()))) + msg)) } + // only used in deleteAll def toKafkaMessage(topic: String, tsv: String) = { KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv)) } + def toKafkaMessage(topic: String, jsValue: JsValue): KafkaMessage = toKafkaMessage(topic, jsValue.toString()) + case class KafkaMessage(msg: ProducerRecord[Key, Val]) private def toKafkaProp(config: Config) = { val props = new Properties() - /* all default configuration for new producer */ + /** all default configuration for new producer */ val brokers = if (config.hasPath("kafka.metadata.broker.list")) config.getString("kafka.metadata.broker.list") else "localhost" http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala index c25b71a..2cbddea 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala @@ -23,11 +23,13 @@ import java.util import java.util.concurrent.ConcurrentHashMap import com.typesafe.config.{Config, ConfigFactory} +import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core.mysqls.{Label, Model} import org.apache.s2graph.core.parsers.WhereParser import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection} import org.apache.s2graph.core.utils.logger +import play.api.libs.json.{JsObject, Json} import scala.collection.JavaConversions._ import scala.collection._ @@ -58,6 +60,7 @@ object Graph { "back.off.timeout" -> java.lang.Integer.valueOf(1000), "hbase.fail.prob" -> java.lang.Double.valueOf(-0.1), "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000), + "delete.all.fetch.count" -> java.lang.Integer.valueOf(200), "future.cache.max.size" -> java.lang.Integer.valueOf(100000), "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000), "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000), @@ -84,10 +87,9 @@ object Graph { (hashKey, filterHashKey) } - def alreadyVisitedVertices(queryResultLs: Seq[QueryResult]): Map[(LabelWithDirection, Vertex), Boolean] = { + def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, Vertex), Boolean] = { val vertices = for { - queryResult <- queryResultLs - edgeWithScore <- queryResult.edgeWithScoreLs + edgeWithScore <- edgeWithScoreLs edge = edgeWithScore.edge vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex } yield (edge.labelWithDir, vertex) -> true @@ -132,81 +134,36 @@ object Graph { tsVal } - def aggregateScore(newScore: Double, - resultEdges: ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)], - duplicateEdges: ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]], - edgeWithScoreSorted: ListBuffer[(HashKey, FilterHashKey, Edge, Double)], - hashKey: HashKey, - filterHashKey: FilterHashKey, - queryParam: QueryParam, - convertedEdge: Edge) = { - - /* skip duplicate policy check if consistencyLevel is strong */ - if (queryParam.label.consistencyLevel != "strong" && resultEdges.containsKey(hashKey)) { - val (oldFilterHashKey, oldEdge, oldScore) = resultEdges.get(hashKey) + def processDuplicates(queryParam: QueryParam, + duplicates: Seq[(FilterHashKey, EdgeWithScore)]): Seq[(FilterHashKey, EdgeWithScore)] = { + + if (queryParam.label.consistencyLevel != "strong") { //TODO: queryParam.duplicatePolicy match { - case Query.DuplicatePolicy.First => // do nothing - case Query.DuplicatePolicy.Raw => - if (duplicateEdges.containsKey(hashKey)) { - duplicateEdges.get(hashKey).append(convertedEdge -> newScore) - } else { - val newBuffer = new ListBuffer[(Edge, Double)] - newBuffer.append(convertedEdge -> newScore) - duplicateEdges.put(hashKey, newBuffer) - } + case Query.DuplicatePolicy.First => Seq(duplicates.head) + case Query.DuplicatePolicy.Raw => duplicates case Query.DuplicatePolicy.CountSum => - resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + 1)) + val countSum = duplicates.size + Seq(duplicates.head._1 -> duplicates.head._2.copy(score = countSum)) case _ => - resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + newScore)) + val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + current._2.score } + Seq(duplicates.head._1 -> duplicates.head._2.copy(score = scoreSum)) } } else { - resultEdges.put(hashKey, (filterHashKey, convertedEdge, newScore)) - edgeWithScoreSorted.append((hashKey, filterHashKey, convertedEdge, newScore)) + duplicates } } - - def aggregateResults(queryRequestWithResult: QueryRequestWithResult, - queryParamResult: Result, - edgesToInclude: util.HashSet[FilterHashKey], - edgesToExclude: util.HashSet[FilterHashKey]): QueryRequestWithResult = { - val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - val (query, stepIdx, _, queryParam) = QueryRequest.unapply(queryRequest).get - - val (duplicateEdges, resultEdges, edgeWithScoreSorted) = queryParamResult - val edgesWithScores = for { - (hashKey, filterHashKey, edge, _) <- edgeWithScoreSorted if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) - score = resultEdges.get(hashKey)._3 - (duplicateEdge, aggregatedScore) <- fetchDuplicatedEdges(edge, score, hashKey, duplicateEdges) if aggregatedScore >= queryParam.threshold - } yield EdgeWithScore(duplicateEdge, aggregatedScore) - - QueryRequestWithResult(queryRequest, QueryResult(edgesWithScores)) - } - - def fetchDuplicatedEdges(edge: Edge, - score: Double, - hashKey: HashKey, - duplicateEdges: ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]]) = { - (edge -> score) +: (if (duplicateEdges.containsKey(hashKey)) duplicateEdges.get(hashKey) else Seq.empty) - } - - def queryResultWithFilter(queryRequestWithResult: QueryRequestWithResult) = { - val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - val (_, _, _, queryParam) = QueryRequest.unapply(queryRequest).get - val whereFilter = queryParam.where.get - if (whereFilter == WhereParser.success) queryResult.edgeWithScoreLs - else queryResult.edgeWithScoreLs.withFilter(edgeWithScore => whereFilter.filter(edgeWithScore.edge)) - } - - def filterEdges(queryResultLsFuture: Future[Seq[QueryRequestWithResult]], + def filterEdges(q: Query, + stepIdx: Int, + queryRequests: Seq[QueryRequest], + queryResultLsFuture: Future[Seq[StepInnerResult]], + queryParams: Seq[QueryParam], alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = Map.empty[(LabelWithDirection, Vertex), Boolean]) - (implicit ec: scala.concurrent.ExecutionContext): Future[Seq[QueryRequestWithResult]] = { + (implicit ec: scala.concurrent.ExecutionContext): Future[StepInnerResult] = { queryResultLsFuture.map { queryRequestWithResultLs => - if (queryRequestWithResultLs.isEmpty) Nil + if (queryRequestWithResultLs.isEmpty) StepInnerResult.Empty else { - val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs.head).get - val (q, stepIdx, srcVertex, queryParam) = QueryRequest.unapply(queryRequest).get val step = q.steps(stepIdx) val nextStepOpt = if (stepIdx < q.steps.size - 1) Option(q.steps(stepIdx + 1)) else None @@ -219,73 +176,114 @@ object Graph { val edgesToExclude = new util.HashSet[FilterHashKey]() val edgesToInclude = new util.HashSet[FilterHashKey]() - val queryParamResultLs = new ListBuffer[Result] - queryRequestWithResultLs.foreach { queryRequestWithResult => - val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get + val sequentialLs = new ListBuffer[(HashKey, FilterHashKey, EdgeWithScore)]() + val agg = new mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, EdgeWithScore)]]() + val params = new mutable.HashMap[HashKey, QueryParam]() + var numOfDuplicates = 0 + var numOfTotal = 0 + queryRequests.zip(queryRequestWithResultLs).foreach { case (queryRequest, stepInnerResult) => val queryParam = queryRequest.queryParam - val duplicateEdges = new util.concurrent.ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]]() - val resultEdges = new util.concurrent.ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)]() - val edgeWithScoreSorted = new ListBuffer[(HashKey, FilterHashKey, Edge, Double)] val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0) - - // store degree value with Array.empty so if degree edge exist, it comes at very first. - def checkDegree() = queryResult.edgeWithScoreLs.headOption.exists { edgeWithScore => - edgeWithScore.edge.isDegree - } - var isDegree = checkDegree() - val includeExcludeKey = queryParam.labelWithDir.labelId -> queryParam.labelWithDir.dir val shouldBeExcluded = excludeLabelWithDirSet.contains(includeExcludeKey) val shouldBeIncluded = includeLabelWithDirSet.contains(includeExcludeKey) - - queryResultWithFilter(queryRequestWithResult).foreach { edgeWithScore => - val (edge, score) = EdgeWithScore.unapply(edgeWithScore).get + val where = queryParam.where.get + + for { + edgeWithScore <- stepInnerResult.edgesWithScoreLs + if where == WhereParser.success || where.filter(edgeWithScore.edge) + (edge, score) = EdgeWithScore.unapply(edgeWithScore).get + } { + numOfTotal += 1 if (queryParam.transformer.isDefault) { val convertedEdge = edge - val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree) + val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree = false) - /* check if this edge should be exlcuded. */ - if (shouldBeExcluded && !isDegree) { + /** check if this edge should be exlcuded. */ + if (shouldBeExcluded) { edgesToExclude.add(filterHashKey) } else { - if (shouldBeIncluded && !isDegree) { + if (shouldBeIncluded) { edgesToInclude.add(filterHashKey) } val tsVal = processTimeDecay(queryParam, convertedEdge) val newScore = labelWeight * score * tsVal - aggregateScore(newScore, resultEdges, duplicateEdges, edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge) + val newEdgeWithScore = EdgeWithScore(convertedEdge, newScore) + sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore)) + agg.get(hashKey) match { + case None => + val newLs = new ListBuffer[(FilterHashKey, EdgeWithScore)]() + newLs += (filterHashKey -> newEdgeWithScore) + agg += (hashKey -> newLs) + case Some(old) => + numOfDuplicates += 1 + old += (filterHashKey -> newEdgeWithScore) + } + params += (hashKey -> queryParam) } } else { convertEdges(queryParam, edge, nextStepOpt).foreach { convertedEdge => - val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree) + val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree = false) - /* check if this edge should be exlcuded. */ - if (shouldBeExcluded && !isDegree) { + /** check if this edge should be exlcuded. */ + if (shouldBeExcluded) { edgesToExclude.add(filterHashKey) } else { - if (shouldBeIncluded && !isDegree) { + if (shouldBeIncluded) { edgesToInclude.add(filterHashKey) } val tsVal = processTimeDecay(queryParam, convertedEdge) val newScore = labelWeight * score * tsVal - aggregateScore(newScore, resultEdges, duplicateEdges, edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge) + val newEdgeWithScore = EdgeWithScore(convertedEdge, newScore) + sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore)) + agg.get(hashKey) match { + case None => + val newLs = new ListBuffer[(FilterHashKey, EdgeWithScore)]() + newLs += (filterHashKey -> newEdgeWithScore) + agg += (hashKey -> newLs) + case Some(old) => + numOfDuplicates += 1 + old += (filterHashKey -> newEdgeWithScore) + } + params += (hashKey -> queryParam) } } } - isDegree = false } - val ret = (duplicateEdges, resultEdges, edgeWithScoreSorted) - queryParamResultLs.append(ret) } - val aggregatedResults = for { - (queryRequestWithResult, queryParamResult) <- queryRequestWithResultLs.zip(queryParamResultLs) - } yield { - aggregateResults(queryRequestWithResult, queryParamResult, edgesToInclude, edgesToExclude) + + val edgeWithScoreLs = new ListBuffer[EdgeWithScore]() + if (numOfDuplicates == 0) { + // no duplicates at all. + for { + (hashKey, filterHashKey, edgeWithScore) <- sequentialLs + if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) + } { + edgeWithScoreLs += edgeWithScore } + } else { + // need to resolve duplicates. + val seen = new mutable.HashSet[HashKey]() + for { + (hashKey, filterHashKey, edgeWithScore) <- sequentialLs + if !seen.contains(hashKey) + if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) + } { + val queryParam = params(hashKey) + val duplicates = processDuplicates(queryParam, agg(hashKey)) + duplicates.foreach { case (_, duplicate) => + if (duplicate.score >= queryParam.threshold) { + seen += hashKey + edgeWithScoreLs += duplicate + } + } + } + } - aggregatedResults + val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges) + StepInnerResult(edgesWithScoreLs = edgeWithScoreLs, degreeEdges = degrees) } } } @@ -310,7 +308,7 @@ object Graph { element } recover { case e: Exception => - logger.error(s"$e", e) + logger.error(s"[toElement]: $e", e) None } get @@ -323,37 +321,33 @@ object Graph { toEdge(GraphUtil.split(s)) } - //"1418342849000\tu\te\t3286249\t71770\ttalk_friend\t{\"is_hidden\":false}" - //{"from":1,"to":101,"label":"graph_test","props":{"time":-1, "weight":10},"timestamp":1417616431}, def toEdge(parts: Array[String]): Option[Edge] = Try { val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) - val props = if (parts.length >= 7) parts(6) else "{}" + val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] val tempDirection = if (parts.length >= 8) parts(7) else "out" val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection - - val edge = Management.toEdge(ts.toLong, operation, srcId, tgtId, label, direction, props) - // logger.debug(s"toEdge: $edge") - Some(edge) + val edge = Edge.toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation) + Option(edge) } recover { case e: Exception => - logger.error(s"toEdge: $e", e) + logger.error(s"[toEdge]: $e", e) throw e } get - //"1418342850000\ti\tv\t168756793\ttalk_user_id\t{\"country_iso\":\"KR\"}" def toVertex(parts: Array[String]): Option[Vertex] = Try { val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) - val props = if (parts.length >= 7) parts(6) else "{}" - Some(Management.toVertex(ts.toLong, operation, srcId, serviceName, colName, props)) + val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] + val vertex = Vertex.toVertex(serviceName, colName, srcId, props, ts.toLong, operation) + Option(vertex) } recover { case e: Throwable => - logger.error(s"toVertex: $e", e) + logger.error(s"[toVertex]: $e", e) throw e } get - def initStorage(config: Config)(ec: ExecutionContext) = { + def initStorage(graph: Graph, config: Config)(ec: ExecutionContext) = { config.getString("s2graph.storage.backend") match { - case "hbase" => new AsynchbaseStorage(config)(ec) + case "hbase" => new AsynchbaseStorage(graph, config)(ec) case _ => throw new RuntimeException("not supported storage.") } } @@ -366,7 +360,7 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) { Model.loadCache() // TODO: Make storage client by config param - val storage = Graph.initStorage(config)(ec) + val storage = Graph.initStorage(this, config)(ec) for { @@ -375,9 +369,11 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) { } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}") /** select */ - def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[Seq[QueryRequestWithResult]] = storage.checkEdges(params) + def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[StepResult] = storage.checkEdges(params) + + def getEdges(q: Query): Future[StepResult] = storage.getEdges(q) - def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = storage.getEdges(q) + def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = storage.getEdgesMultiQuery(mq) def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = storage.getVertices(vertices) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala index 0898ffa..2f090cf 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala @@ -31,6 +31,8 @@ object GraphExceptions { case class LabelAlreadyExistException(msg: String) extends Exception(msg) + case class LabelNameTooLongException(msg: String) extends Exception(msg) + case class InternalException(msg: String) extends Exception(msg) case class IllegalDataTypeException(msg: String) extends Exception(msg) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index 39020c7..9ef7c14 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -19,7 +19,7 @@ package org.apache.s2graph.core -import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNotExistException} +import org.apache.s2graph.core.GraphExceptions.{LabelNameTooLongException, InvalidHTableException, LabelAlreadyExistException, LabelNotExistException} import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} import org.apache.s2graph.core.mysqls._ import org.apache.s2graph.core.types.HBaseType._ @@ -48,9 +48,9 @@ object Management { import HBaseType._ + val LABEL_NAME_MAX_LENGTH = 100 val DefaultCompressionAlgorithm = "gz" - def findService(serviceName: String) = { Service.findByName(serviceName, useCache = false) } @@ -190,62 +190,6 @@ object Management { } } - def toEdge(ts: Long, operation: String, srcId: String, tgtId: String, - labelStr: String, direction: String = "", props: String): Edge = { - - val label = tryOption(labelStr, getServiceLabel) - val dir = - if (direction == "") -// GraphUtil.toDirection(label.direction) - GraphUtil.directions("out") - else - GraphUtil.toDirection(direction) - - // logger.debug(s"$srcId, ${label.srcColumnWithDir(dir)}") - // logger.debug(s"$tgtId, ${label.tgtColumnWithDir(dir)}") - - val srcVertexId = toInnerVal(srcId, label.srcColumn.columnType, label.schemaVersion) - val tgtVertexId = toInnerVal(tgtId, label.tgtColumn.columnType, label.schemaVersion) - - val srcColId = label.srcColumn.id.get - val tgtColId = label.tgtColumn.id.get - val (srcVertex, tgtVertex) = if (dir == GraphUtil.directions("out")) { - (Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis()), - Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis())) - } else { - (Vertex(SourceVertexId(tgtColId, tgtVertexId), System.currentTimeMillis()), - Vertex(TargetVertexId(srcColId, srcVertexId), System.currentTimeMillis())) - } - - // val dir = if (direction == "") GraphUtil.toDirection(label.direction) else GraphUtil.toDirection(direction) - val labelWithDir = LabelWithDirection(label.id.get, dir) - val op = tryOption(operation, GraphUtil.toOp) - - val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj()) - val parsedProps = toProps(label, jsObject.fields).toMap - val propsWithTs = parsedProps.map(kv => (kv._1 -> InnerValLikeWithTs(kv._2, ts))) ++ - Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(ts, label.schemaVersion), ts)) - - Edge(srcVertex, tgtVertex, labelWithDir, op, version = ts, propsWithTs = propsWithTs) - - } - - def toVertex(ts: Long, operation: String, id: String, serviceName: String, columnName: String, props: String): Vertex = { - Service.findByName(serviceName) match { - case None => throw new RuntimeException(s"$serviceName does not exist. create service first.") - case Some(service) => - ServiceColumn.find(service.id.get, columnName) match { - case None => throw new RuntimeException(s"$columnName is not exist. create service column first.") - case Some(col) => - val idVal = toInnerVal(id, col.columnType, col.schemaVersion) - val op = tryOption(operation, GraphUtil.toOp) - val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj()) - val parsedProps = toProps(col, jsObject).toMap - Vertex(VertexId(col.id.get, idVal), ts, parsedProps, op = op) - } - } - } - def toProps(column: ServiceColumn, js: JsObject): Seq[(Int, InnerValLike)] = { val props = for { @@ -346,7 +290,7 @@ class Management(graph: Graph) { schemaVersion: String = DEFAULT_VERSION, isAsync: Boolean, compressionAlgorithm: String = "gz", - options: Option[String] = None): Try[Label] = { + options: Option[String]): Try[Label] = { val labelOpt = Label.findByName(label, useCache = false) @@ -355,7 +299,8 @@ class Management(graph: Graph) { case Some(l) => throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.") case None => - /* create all models */ + /** create all models */ + if (label.length > LABEL_NAME_MAX_LENGTH ) throw new LabelNameTooLongException(s"Label name ${label} too long.( max length : 40 )") val newLabel = Label.insertAll(label, srcServiceName, srcColumnName, srcColumnType, tgtServiceName, tgtColumnName, tgtColumnType, @@ -387,8 +332,8 @@ class Management(graph: Graph) { * copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster. */ def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = { - val old = Label.findByName(oldLabelName).getOrElse(throw new LabelAlreadyExistException(s"Old label $oldLabelName not exists.")) - if (Label.findByName(newLabelName).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.") + val old = Label.findByName(oldLabelName, useCache = false).getOrElse(throw new LabelNotExistException(s"Old label $oldLabelName not exists.")) + if (Label.findByName(newLabelName, useCache = false).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.") val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) } val allIndices = old.indices.map { index => Index(index.name, index.propNames) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala b/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala index 0ecbf4e..eaadb39 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala @@ -49,8 +49,15 @@ object OrderingUtil { case (xv: Long, yv: Long) => implicitly[Ordering[Long]].compare(xv, yv) case (xv: Double, yv: Double) => implicitly[Ordering[Double]].compare(xv, yv) case (xv: String, yv: String) => implicitly[Ordering[String]].compare(xv, yv) + case (xv: BigDecimal, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(xv, yv) case (xv: JsValue, yv: JsValue) => implicitly[Ordering[JsValue]].compare(xv, yv) case (xv: InnerValLike, yv: InnerValLike) => implicitly[Ordering[InnerValLike]].compare(xv, yv) + case (xv: BigDecimal, yv: Long) => implicitly[Ordering[BigDecimal]].compare(xv, BigDecimal(yv)) + case (xv: Long, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(BigDecimal(xv), yv) + case (xv: BigDecimal, yv: Int) => implicitly[Ordering[BigDecimal]].compare(xv, BigDecimal(yv)) + case (xv: Int, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(BigDecimal(xv), yv) + case (xv: BigDecimal, yv: Double) => implicitly[Ordering[BigDecimal]].compare(xv, BigDecimal(yv)) + case (xv: Double, yv: BigDecimal) => implicitly[Ordering[BigDecimal]].compare(BigDecimal(xv), yv) } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala index 9ab08b3..7cc2420 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala @@ -20,11 +20,13 @@ package org.apache.s2graph.core import org.apache.s2graph.core.GraphExceptions.BadQueryException -import org.apache.s2graph.core.mysqls.{ColumnMeta, Label, LabelMeta, ServiceColumn} +import org.apache.s2graph.core.mysqls._ import org.apache.s2graph.core.types.{InnerVal, InnerValLike} -import play.api.libs.json.{Json, _} import org.apache.s2graph.core.JSONParser._ -import scala.collection.mutable.ListBuffer +import play.api.libs.json.{Json, _} + +import scala.collection.immutable +import scala.collection.mutable.{ArrayBuffer, ListBuffer} object PostProcess { @@ -32,6 +34,7 @@ object PostProcess { type EDGE_VALUES = Map[String, JsValue] type ORDER_BY_VALUES = (Any, Any, Any, Any) type RAW_EDGE = (EDGE_VALUES, Double, ORDER_BY_VALUES) + type GROUP_BY_KEY = Map[String, JsValue] /** * Result Entity score field name @@ -47,523 +50,153 @@ object PostProcess { val SCORE_FIELD_NAME = "scoreSum" val reservedColumns = Set("cacheRemain", "from", "to", "label", "direction", "_timestamp", "timestamp", "score", "props") - def groupEdgeResult(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { - val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap - // filterNot {case (edge, score) => edge.props.contains(LabelMeta.degreeSeq)} - val groupedEdgesWithRank = (for { - queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - edgeWithScore <- queryResult.edgeWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, queryRequest.query.filterOutFields)) - } yield { - (queryRequest.queryParam, edge, score) - }).groupBy { - case (queryParam, edge, rank) if edge.labelWithDir.dir == GraphUtil.directions("in") => - (queryParam.label.srcColumn, queryParam.label.label, queryParam.label.tgtColumn, edge.tgtVertex.innerId, edge.isDegree) - case (queryParam, edge, rank) => - (queryParam.label.tgtColumn, queryParam.label.label, queryParam.label.srcColumn, edge.tgtVertex.innerId, edge.isDegree) - } - val ret = for { - ((tgtColumn, labelName, srcColumn, target, isDegreeEdge), edgesAndRanks) <- groupedEdgesWithRank if !isDegreeEdge - edgesWithRanks = edgesAndRanks.groupBy(x => x._2.srcVertex).map(_._2.head) - id <- innerValToJsValue(target, tgtColumn.columnType) - } yield { - Json.obj("name" -> tgtColumn.columnName, "id" -> id, - SCORE_FIELD_NAME -> edgesWithRanks.map(_._3).sum, - "label" -> labelName, - "aggr" -> Json.obj( - "name" -> srcColumn.columnName, - "ids" -> edgesWithRanks.flatMap { case (queryParam, edge, rank) => - innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType) - }, - "edges" -> edgesWithRanks.map { case (queryParam, edge, rank) => - Json.obj("id" -> innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType), - "props" -> propsToJson(edge), - "score" -> rank - ) - } - ) - ) - } - - ret.toList - } - - def sortWithFormatted(jsons: Seq[JsObject], - scoreField: String = "scoreSum", - queryRequestWithResultLs: Seq[QueryRequestWithResult], - decrease: Boolean = true): JsObject = { - val ordering = if (decrease) -1 else 1 - val sortedJsons = jsons.sortBy { jsObject => (jsObject \ scoreField).as[Double] * ordering } - - if (queryRequestWithResultLs.isEmpty) Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons) - else Json.obj( - "size" -> sortedJsons.size, - "results" -> sortedJsons, - "impressionId" -> queryRequestWithResultLs.head.queryRequest.query.impressionId() - ) - } - - private def toHashKey(edge: Edge, queryParam: QueryParam, fields: Seq[String], delimiter: String = ","): Int = { - val ls = for { - field <- fields - } yield { - field match { - case "from" | "_from" => edge.srcVertex.innerId.toIdString() - case "to" | "_to" => edge.tgtVertex.innerId.toIdString() - case "label" => edge.labelWithDir.labelId - case "direction" => JsString(GraphUtil.fromDirection(edge.labelWithDir.dir)) - case "_timestamp" | "timestamp" => edge.ts - case _ => - queryParam.label.metaPropsInvMap.get(field) match { - case None => throw new RuntimeException(s"unknow column: $field") - case Some(labelMeta) => edge.propsWithTs.get(labelMeta.seq) match { - case None => labelMeta.defaultValue - case Some(propVal) => propVal - } - } - } + def s2EdgeParent(graph: Graph, + parentEdges: Seq[EdgeWithScore]): JsValue = { + if (parentEdges.isEmpty) JsNull + else { + val ancestors = for { + current <- parentEdges + parents = s2EdgeParent(graph, current.edge.parentEdges) if parents != JsNull + } yield { + val s2Edge = current.edge.originalEdgeOpt.getOrElse(current.edge) + s2EdgeToJsValue(s2Edge, current.score, false, parents = parents) + } + Json.toJson(ancestors) } - val ret = ls.hashCode() - ret - } - - def resultInnerIds(queryRequestWithResultLs: Seq[QueryRequestWithResult], isSrcVertex: Boolean = false): Seq[Int] = { - for { - queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - q = queryRequest.query - edgeWithScore <- queryResult.edgeWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - } yield toHashKey(edge, queryRequest.queryParam, q.filterOutFields) - } - - def summarizeWithListExcludeFormatted(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { - val jsons = groupEdgeResult(queryRequestWithResultLs, exclude) - sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs, decrease = true) } - def summarizeWithList(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { - val jsons = groupEdgeResult(queryRequestWithResultLs, exclude) - sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs) - } - - def summarizeWithListFormatted(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { - val jsons = groupEdgeResult(queryRequestWithResultLs, exclude) - sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs) - } - - def toSimpleVertexArrJson(queryRequestWithResultLs: Seq[QueryRequestWithResult]): JsValue = { - toSimpleVertexArrJson(queryRequestWithResultLs, Seq.empty[QueryRequestWithResult]) - } - - private def orderBy(queryOption: QueryOption, - rawEdges: ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)]): ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)] = { - import OrderingUtil._ - - if (queryOption.withScore && queryOption.orderByColumns.nonEmpty) { - val ascendingLs = queryOption.orderByColumns.map(_._2) - rawEdges.sortBy(_._3)(TupleMultiOrdering[Any](ascendingLs)) + def s2EdgeToJsValue(s2Edge: Edge, + score: Double, + isDegree: Boolean = false, + parents: JsValue = JsNull): JsValue = { + if (isDegree) { + Json.obj( + "from" -> anyValToJsValue(s2Edge.srcId), + "label" -> s2Edge.labelName, + LabelMeta.degree.name -> anyValToJsValue(s2Edge.propsWithTs(LabelMeta.degreeSeq).innerVal.value) + ) } else { - rawEdges + Json.obj("from" -> anyValToJsValue(s2Edge.srcId), + "to" -> anyValToJsValue(s2Edge.tgtId), + "label" -> s2Edge.labelName, + "score" -> score, + "props" -> JSONParser.propertiesToJson(s2Edge.properties), + "direction" -> s2Edge.direction, + "timestamp" -> anyValToJsValue(s2Edge.ts), + "parents" -> parents + ) } } - private def getColumnValue(keyWithJs: Map[String, JsValue], score: Double, edge: Edge, column: String): Any = { - column match { - case "score" => score - case "timestamp" | "_timestamp" => edge.ts - case _ => - keyWithJs.get(column) match { - case None => keyWithJs.get("props").map { js => (js \ column).as[JsValue] }.get - case Some(x) => x - } + def withImpressionId(queryOption: QueryOption, + size: Int, + degrees: Seq[JsValue], + results: Seq[JsValue]): JsValue = { + queryOption.impIdOpt match { + case None => Json.obj( + "size" -> size, + "degrees" -> degrees, + "results" -> results + ) + case Some(impId) => + Json.obj( + "size" -> size, + "degrees" -> degrees, + "results" -> results, + Experiment.ImpressionKey -> impId + ) } } + def s2VertexToJson(s2Vertex: Vertex): Option[JsValue] = { + val props = for { + (k, v) <- s2Vertex.properties + jsVal <- anyValToJsValue(v) + } yield k -> jsVal - private def buildReplaceJson(jsValue: JsValue)(mapper: JsValue => JsValue): JsValue = { - def traverse(js: JsValue): JsValue = js match { - case JsNull => mapper(JsNull) - case JsNumber(v) => mapper(js) - case JsString(v) => mapper(js) - case JsBoolean(v) => mapper(js) - case JsArray(elements) => JsArray(elements.map { t => traverse(mapper(t)) }) - case JsObject(values) => JsObject(values.map { case (k, v) => k -> traverse(mapper(v)) }) + for { + id <- anyValToJsValue(s2Vertex.innerIdVal) + } yield { + Json.obj( + "serviceName" -> s2Vertex.serviceName, + "columnName" -> s2Vertex.columnName, + "id" -> id, + "props" -> Json.toJson(props), + "timestamp" -> s2Vertex.ts + ) } - - traverse(jsValue) } - /** test query with filterOut is not working since it can not diffrentate filterOut */ - private def buildNextQuery(jsonQuery: JsValue, _cursors: Seq[Seq[String]]): JsValue = { - val cursors = _cursors.flatten.iterator + def verticesToJson(s2Vertices: Seq[Vertex]): JsValue = + Json.toJson(s2Vertices.flatMap(s2VertexToJson(_))) - buildReplaceJson(jsonQuery) { - case js@JsObject(fields) => - val isStep = fields.find { case (k, _) => k == "label" } // find label group - if (isStep.isEmpty) js - else { - // TODO: Order not ensured - val withCursor = js.fieldSet | Set("cursor" -> JsString(cursors.next)) - JsObject(withCursor.toSeq) - } - case js => js - } - } + def withOptionalFields(queryOption: QueryOption, + size: Int, + degrees: Seq[JsValue], + results: Seq[JsValue], + failCount: Int = 0, + cursors: => JsValue, + nextQuery: => Option[JsValue]): JsValue = { - private def buildRawEdges(queryOption: QueryOption, - queryRequestWithResultLs: Seq[QueryRequestWithResult], - excludeIds: Map[Int, Boolean], - scoreWeight: Double = 1.0): (ListBuffer[JsValue], ListBuffer[RAW_EDGE]) = { - val degrees = ListBuffer[JsValue]() - val rawEdges = ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)]() - val metaPropNamesMap = scala.collection.mutable.Map[String, Int]() - for { - queryRequestWithResult <- queryRequestWithResultLs - } yield { - val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get - queryRequest.queryParam.label.metaPropNames.foreach { metaPropName => - metaPropNamesMap.put(metaPropName, metaPropNamesMap.getOrElse(metaPropName, 0) + 1) - } - } - val propsExistInAll = metaPropNamesMap.filter(kv => kv._2 == queryRequestWithResultLs.length) - val orderByColumns = queryOption.orderByColumns.filter { case (column, _) => - column match { - case "from" | "to" | "label" | "score" | "timestamp" | "_timestamp" => true - case _ => - propsExistInAll.contains(column) -// //TODO?? -// false -// queryParam.label.metaPropNames.contains(column) - } - } + val kvs = new ArrayBuffer[(String, JsValue)]() - /* build result jsons */ - for { - queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - queryParam = queryRequest.queryParam - edgeWithScore <- queryResult.edgeWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - hashKey = toHashKey(edge, queryRequest.queryParam, queryOption.filterOutFields) - if !excludeIds.contains(hashKey) - } { + kvs.append("size" -> JsNumber(size)) + kvs.append("degrees" -> JsArray(degrees)) + kvs.append("results" -> JsArray(results)) - // edge to json - val (srcColumn, _) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir) - val fromOpt = innerValToJsValue(edge.srcVertex.id.innerId, srcColumn.columnType) - if (edge.isDegree && fromOpt.isDefined) { - if (queryOption.limitOpt.isEmpty) { - degrees += Json.obj( - "from" -> fromOpt.get, - "label" -> queryRequest.queryParam.label.label, - "direction" -> GraphUtil.fromDirection(edge.labelWithDir.dir), - LabelMeta.degree.name -> innerValToJsValue(edge.propsWithTs(LabelMeta.degreeSeq).innerVal, InnerVal.LONG) - ) - } - } else { - val keyWithJs = edgeToJson(edge, score, queryRequest.query, queryRequest.queryParam) - val orderByValues: (Any, Any, Any, Any) = orderByColumns.length match { - case 0 => - (None, None, None, None) - case 1 => - val it = orderByColumns.iterator - val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) - (v1, None, None, None) - case 2 => - val it = orderByColumns.iterator - val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) - val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1) - (v1, v2, None, None) - case 3 => - val it = orderByColumns.iterator - val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) - val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1) - val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1) - (v1, v2, v3, None) - case _ => - val it = orderByColumns.iterator - val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) - val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1) - val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1) - val v4 = getColumnValue(keyWithJs, score, edge, it.next()._1) - (v1, v2, v3, v4) - } + if (queryOption.impIdOpt.isDefined) kvs.append(Experiment.ImpressionKey -> JsString(queryOption.impIdOpt.get)) - val currentEdge = (keyWithJs, score * scoreWeight, orderByValues) - rawEdges += currentEdge - } - } - (degrees, rawEdges) + JsObject(kvs) } - private def buildResultJsValue(queryOption: QueryOption, - degrees: ListBuffer[JsValue], - rawEdges: ListBuffer[RAW_EDGE]): JsValue = { - if (queryOption.groupByColumns.isEmpty) { - // ordering - val filteredEdges = rawEdges.filter(t => t._2 >= queryOption.scoreThreshold) + def toJson(graph: Graph, + queryOption: QueryOption, + stepResult: StepResult): JsValue = { - val edges = queryOption.limitOpt match { - case None => orderBy(queryOption, filteredEdges).map(_._1) - case Some(limit) => orderBy(queryOption, filteredEdges).map(_._1).take(limit) - } - val resultDegrees = if (queryOption.returnDegree) degrees else emptyDegrees - Json.obj( - "size" -> edges.size, - "degrees" -> resultDegrees, -// "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()), - "results" -> edges - ) - } else { - val grouped = rawEdges.groupBy { case (keyWithJs, _, _) => - val props = keyWithJs.get("props") + val degrees = + if (queryOption.returnDegree) stepResult.degreeEdges.map(t => s2EdgeToJsValue(t.s2Edge, t.score, true)) + else emptyDegrees - for { - column <- queryOption.groupByColumns - value <- keyWithJs.get(column) match { - case None => props.flatMap { js => (js \ column).asOpt[JsValue] } - case Some(x) => Some(x) - } - } yield column -> value + if (queryOption.groupBy.keys.isEmpty) { + // no group by specified on query. + + val ls = stepResult.results.map { t => + val parents = if (queryOption.returnTree) s2EdgeParent(graph, t.parentEdges) else JsNull + s2EdgeToJsValue(t.s2Edge, t.score, false, parents) } + withImpressionId(queryOption, ls.size, degrees, ls) + } else { - val groupedEdgesWithScoreSum = + val results = for { - (groupByKeyVals, groupedRawEdges) <- grouped - scoreSum = groupedRawEdges.map(x => x._2).sum if scoreSum >= queryOption.scoreThreshold + (groupByValues, (scoreSum, edges)) <- stepResult.grouped } yield { - // ordering - val edges = orderBy(queryOption, groupedRawEdges).map(_._1) + val groupByKeyValues = queryOption.groupBy.keys.zip(groupByValues).map { case (k, valueOpt) => + k -> valueOpt.flatMap(anyValToJsValue).getOrElse(JsNull) + } + val groupByValuesJson = Json.toJson(groupByKeyValues.toMap) - //TODO: refactor this - val js = if (queryOption.returnAgg) + if (!queryOption.returnAgg) { Json.obj( - "groupBy" -> Json.toJson(groupByKeyVals.toMap), + "groupBy" -> groupByValuesJson, "scoreSum" -> scoreSum, - "agg" -> edges + "agg" -> Json.arr() ) - else + } else { + val agg = edges.map { t => + val parents = if (queryOption.returnTree) s2EdgeParent(graph, t.parentEdges) else JsNull + s2EdgeToJsValue(t.s2Edge, t.score, false, parents) + } + val aggJson = Json.toJson(agg) Json.obj( - "groupBy" -> Json.toJson(groupByKeyVals.toMap), + "groupBy" -> groupByValuesJson, "scoreSum" -> scoreSum, - "agg" -> Json.arr() + "agg" -> aggJson ) - (js, scoreSum) - } - - val groupedSortedJsons = queryOption.limitOpt match { - case None => - groupedEdgesWithScoreSum.toList.sortBy { case (jsVal, scoreSum) => scoreSum * -1 }.map(_._1) - case Some(limit) => - groupedEdgesWithScoreSum.toList.sortBy { case (jsVal, scoreSum) => scoreSum * -1 }.map(_._1).take(limit) - } - val resultDegrees = if (queryOption.returnDegree) degrees else emptyDegrees - Json.obj( - "size" -> groupedSortedJsons.size, - "degrees" -> resultDegrees, -// "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()), - "results" -> groupedSortedJsons - ) - } - } - - def toSimpleVertexArrJsonMulti(queryOption: QueryOption, - resultWithExcludeLs: Seq[(Seq[QueryRequestWithResult], Seq[QueryRequestWithResult])], - excludes: Seq[QueryRequestWithResult]): JsValue = { - val excludeIds = (Seq((Seq.empty, excludes)) ++ resultWithExcludeLs).foldLeft(Map.empty[Int, Boolean]) { case (acc, (result, excludes)) => - acc ++ resultInnerIds(excludes).map(hashKey => hashKey -> true).toMap - } - - val (degrees, rawEdges) = (ListBuffer.empty[JsValue], ListBuffer.empty[RAW_EDGE]) - for { - (result, localExclude) <- resultWithExcludeLs - } { - val newResult = result.map { queryRequestWithResult => - val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get - val newQuery = queryRequest.query.copy(queryOption = queryOption) - queryRequestWithResult.copy(queryRequest = queryRequest.copy(query = newQuery)) - } - val (_degrees, _rawEdges) = buildRawEdges(queryOption, newResult, excludeIds) - degrees ++= _degrees - rawEdges ++= _rawEdges - } - buildResultJsValue(queryOption, degrees, rawEdges) - } - - def toSimpleVertexArrJson(queryOption: QueryOption, - queryRequestWithResultLs: Seq[QueryRequestWithResult], - exclude: Seq[QueryRequestWithResult]): JsValue = { - val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap - val (degrees, rawEdges) = buildRawEdges(queryOption, queryRequestWithResultLs, excludeIds) - buildResultJsValue(queryOption, degrees, rawEdges) - } - - - def toSimpleVertexArrJson(queryRequestWithResultLs: Seq[QueryRequestWithResult], - exclude: Seq[QueryRequestWithResult]): JsValue = { - - queryRequestWithResultLs.headOption.map { queryRequestWithResult => - val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get - val query = queryRequest.query - val queryOption = query.queryOption - val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap - val (degrees, rawEdges) = buildRawEdges(queryOption, queryRequestWithResultLs, excludeIds) - buildResultJsValue(queryOption, degrees, rawEdges) - } getOrElse emptyResults - } - - def verticesToJson(vertices: Iterable[Vertex]) = { - Json.toJson(vertices.flatMap { v => vertexToJson(v) }) - } - - def propsToJson(edge: Edge, q: Query, queryParam: QueryParam): Map[String, JsValue] = { - for { - (seq, innerValWithTs) <- edge.propsWithTs if LabelMeta.isValidSeq(seq) - labelMeta <- queryParam.label.metaPropsMap.get(seq) - jsValue <- innerValToJsValue(innerValWithTs.innerVal, labelMeta.dataType) - } yield labelMeta.name -> jsValue - } - - private def edgeParent(parentEdges: Seq[EdgeWithScore], q: Query, queryParam: QueryParam): JsValue = { - if (parentEdges.isEmpty) { - JsNull - } else { - val parents = for { - parent <- parentEdges - (parentEdge, parentScore) = EdgeWithScore.unapply(parent).get - parentQueryParam = QueryParam(parentEdge.labelWithDir) - parents = edgeParent(parentEdge.parentEdges, q, parentQueryParam) if parents != JsNull - } yield { - val originalEdge = parentEdge.originalEdgeOpt.getOrElse(parentEdge) - val edgeJson = edgeToJsonInner(originalEdge, parentScore, q, parentQueryParam) + ("parents" -> parents) - Json.toJson(edgeJson) - } - - Json.toJson(parents) - } - } - - /** TODO */ - def edgeToJsonInner(edge: Edge, score: Double, q: Query, queryParam: QueryParam): Map[String, JsValue] = { - val (srcColumn, tgtColumn) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir) - - val kvMapOpt = for { - from <- innerValToJsValue(edge.srcVertex.id.innerId, srcColumn.columnType) - to <- innerValToJsValue(edge.tgtVertex.id.innerId, tgtColumn.columnType) - } yield { - val targetColumns = if (q.selectColumnsSet.isEmpty) reservedColumns else (reservedColumns & q.selectColumnsSet) + "props" - val _propsMap = queryParam.label.metaPropsDefaultMapInner ++ propsToJson(edge, q, queryParam) - val propsMap = if (q.selectColumnsSet.nonEmpty) _propsMap.filterKeys(q.selectColumnsSet) else _propsMap - - val kvMap = targetColumns.foldLeft(Map.empty[String, JsValue]) { (map, column) => - val jsValue = column match { - case "cacheRemain" => JsNumber(queryParam.cacheTTLInMillis - (System.currentTimeMillis() - queryParam.timestamp)) - case "from" => from - case "to" => to - case "label" => JsString(queryParam.label.label) - case "direction" => JsString(GraphUtil.fromDirection(edge.labelWithDir.dir)) - case "_timestamp" | "timestamp" => JsNumber(edge.ts) - case "score" => JsNumber(score) - case "props" if propsMap.nonEmpty => Json.toJson(propsMap) - case _ => JsNull + } } - - if (jsValue == JsNull) map else map + (column -> jsValue) - } - kvMap - } - - kvMapOpt.getOrElse(Map.empty) - } - - def edgeToJson(edge: Edge, score: Double, q: Query, queryParam: QueryParam): Map[String, JsValue] = { - val kvs = edgeToJsonInner(edge, score, q, queryParam) - if (kvs.nonEmpty && q.returnTree) kvs + ("parents" -> Json.toJson(edgeParent(edge.parentEdges, q, queryParam))) - else kvs - } - - def vertexToJson(vertex: Vertex): Option[JsObject] = { - val serviceColumn = ServiceColumn.findById(vertex.id.colId) - - for { - id <- innerValToJsValue(vertex.innerId, serviceColumn.columnType) - } yield { - Json.obj("serviceName" -> serviceColumn.service.serviceName, - "columnName" -> serviceColumn.columnName, - "id" -> id, "props" -> propsToJson(vertex), - "timestamp" -> vertex.ts, - // "belongsTo" -> vertex.belongLabelIds) - "belongsTo" -> vertex.belongLabelIds.flatMap(Label.findByIdOpt(_).map(_.label))) + withImpressionId(queryOption, results.size, degrees, results) } } - - private def keysToName(seqsToNames: Map[Int, String], props: Map[Int, InnerValLike]) = { - for { - (seq, value) <- props - name <- seqsToNames.get(seq) - } yield (name, value) - } - - private def propsToJson(vertex: Vertex) = { - val serviceColumn = vertex.serviceColumn - val props = for { - (propKey, innerVal) <- vertex.props - columnMeta <- ColumnMeta.findByIdAndSeq(serviceColumn.id.get, propKey.toByte, useCache = true) - jsValue <- innerValToJsValue(innerVal, columnMeta.dataType) - } yield { - (columnMeta.name -> jsValue) - } - props.toMap - } - - def propsToJson(edge: Edge) = { - for { - (seq, v) <- edge.propsWithTs if LabelMeta.isValidSeq(seq) - metaProp <- edge.label.metaPropsMap.get(seq) - jsValue <- innerValToJsValue(v.innerVal, metaProp.dataType) - } yield { - (metaProp.name, jsValue) - } - } - - def summarizeWithListExclude(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]): JsObject = { - val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap - - - val groupedEdgesWithRank = (for { - queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - edgeWithScore <- queryResult.edgeWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, queryRequest.query.filterOutFields)) - } yield { - (edge, score) - }).groupBy { case (edge, score) => - (edge.label.tgtColumn, edge.label.srcColumn, edge.tgtVertex.innerId) - } - - val jsons = for { - ((tgtColumn, srcColumn, target), edgesAndRanks) <- groupedEdgesWithRank - (edges, ranks) = edgesAndRanks.groupBy(x => x._1.srcVertex).map(_._2.head).unzip - tgtId <- innerValToJsValue(target, tgtColumn.columnType) - } yield { - Json.obj(tgtColumn.columnName -> tgtId, - s"${srcColumn.columnName}s" -> - edges.flatMap(edge => innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType)), "scoreSum" -> ranks.sum) - } - val sortedJsons = jsons.toList.sortBy { jsObj => (jsObj \ "scoreSum").as[Double] }.reverse - if (queryRequestWithResultLs.isEmpty) { - Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons) - } else { - Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons, - "impressionId" -> queryRequestWithResultLs.head.queryRequest.query.impressionId()) - } - - } - - } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala index 28d78a9..ef40688 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala @@ -21,6 +21,7 @@ package org.apache.s2graph.core import com.google.common.hash.Hashing import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.GraphExceptions.LabelNotExistException import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} import org.apache.s2graph.core.parsers.{Where, WhereParser} import org.apache.s2graph.core.types.{HBaseSerializable, InnerVal, InnerValLike, LabelWithDirection} @@ -51,6 +52,12 @@ object Query { } } +object GroupBy { + val Empty = GroupBy() +} +case class GroupBy(keys: Seq[String] = Nil, + limit: Int = Int.MaxValue) + case class MultiQuery(queries: Seq[Query], weights: Seq[Double], queryOption: QueryOption, @@ -58,7 +65,7 @@ case class MultiQuery(queries: Seq[Query], case class QueryOption(removeCycle: Boolean = false, selectColumns: Seq[String] = Seq.empty, - groupByColumns: Seq[String] = Seq.empty, + groupBy: GroupBy = GroupBy.Empty, orderByColumns: Seq[(String, Boolean)] = Seq.empty, filterOutQuery: Option[Query] = None, filterOutFields: Seq[String] = Seq(LabelMeta.to.name), @@ -67,8 +74,11 @@ case class QueryOption(removeCycle: Boolean = false, limitOpt: Option[Int] = None, returnAgg: Boolean = true, scoreThreshold: Double = Double.MinValue, - returnDegree: Boolean = true) - + returnDegree: Boolean = true, + impIdOpt: Option[String] = None) { + val orderByKeys = orderByColumns.map(_._1) + val ascendingVals = orderByColumns.map(_._2) +} case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex], steps: IndexedSeq[Step] = Vector.empty[Step], @@ -77,8 +87,7 @@ case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex], val removeCycle = queryOption.removeCycle val selectColumns = queryOption.selectColumns -// val groupBy = queryOption.groupBy - val groupByColumns = queryOption.groupByColumns + val groupBy = queryOption.groupBy val orderByColumns = queryOption.orderByColumns val filterOutQuery = queryOption.filterOutQuery val filterOutFields = queryOption.filterOutFields @@ -90,7 +99,7 @@ case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex], def cacheKeyBytes: Array[Byte] = { val selectBytes = Bytes.toBytes(queryOption.selectColumns.toString) - val groupBytes = Bytes.toBytes(queryOption.groupByColumns.toString) + val groupBytes = Bytes.toBytes(queryOption.groupBy.keys.toString) val orderByBytes = Bytes.toBytes(queryOption.orderByColumns.toString) val filterOutBytes = queryOption.filterOutQuery.map(_.cacheKeyBytes).getOrElse(Array.empty[Byte]) val returnTreeBytes = Bytes.toBytes(queryOption.returnTree) @@ -279,11 +288,23 @@ case class RankParam(labelId: Int, var keySeqAndWeights: Seq[(Byte, Double)] = S bytes } } +case class S2Request(labelName: String, + direction: String = "out", + ts: Long = System.currentTimeMillis(), + options: Map[String, Any] = Map.empty) { + val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) + val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported.")) + val labelWithDir = LabelWithDirection(label.id.get, dir) + //TODO: need to merge options into queryParam. + val queryParam = QueryParam(labelWithDir, ts) +} object QueryParam { lazy val Empty = QueryParam(LabelWithDirection(0, 0)) lazy val DefaultThreshold = Double.MinValue val Delimiter = "," + val maxMetaByte = (-1).toByte + val fillArray = Array.fill(100)(maxMetaByte) } case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System.currentTimeMillis()) { @@ -381,13 +402,13 @@ case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System } def limit(offset: Int, limit: Int): QueryParam = { - /* since degree info is located on first always */ - if (offset == 0 && this.columnRangeFilter == null) { - this.limit = limit + 1 - this.offset = offset - } else { - this.limit = limit - this.offset = offset + 1 + /** since degree info is located on first always */ + this.limit = limit + this.offset = offset + + if (this.columnRangeFilter == null) { + if (offset == 0) this.limit = limit + 1 + else this.offset = offset + 1 } // this.columnPaginationFilter = new ColumnPaginationFilter(this.limit, this.offset) this @@ -400,26 +421,24 @@ case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System } } - def interval(from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]): QueryParam = { - // val len = label.orderTypes.size.toByte - // val len = label.extraIndicesMap(labelOrderSeq).sortKeyTypes.size.toByte - // logger.error(s"indicesMap: ${label.indicesMap(labelOrderSeq)}") - val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte + def paddingInterval(len: Byte, from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]) = { + val fromVal = Bytes.add(propsToBytes(from), QueryParam.fillArray) + val toVal = propsToBytes(to) - val minMetaByte = InnerVal.minMetaByte - // val maxMetaByte = InnerVal.maxMetaByte - val maxMetaByte = -1.toByte - val toVal = Bytes.add(propsToBytes(to), Array.fill(1)(minMetaByte)) - //FIXME - val fromVal = Bytes.add(propsToBytes(from), Array.fill(10)(maxMetaByte)) toVal(0) = len fromVal(0) = len - val maxBytes = fromVal - val minBytes = toVal + + val minMax = (toVal, fromVal) // inverted + minMax + } + + def interval(from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]): QueryParam = { + val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte + val (minBytes, maxBytes) = paddingInterval(len, from, to) + this.columnRangeFilterMaxBytes = maxBytes this.columnRangeFilterMinBytes = minBytes - val rangeFilter = new ColumnRangeFilter(minBytes, true, maxBytes, true) - this.columnRangeFilter = rangeFilter + this.columnRangeFilter = new ColumnRangeFilter(minBytes, true, maxBytes, true) this } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala index 5343659..5b2622f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala @@ -21,31 +21,42 @@ package org.apache.s2graph.core import org.apache.s2graph.core.mysqls.LabelMeta import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs} +import org.apache.s2graph.core.utils.logger -import scala.collection.Seq +import scala.collection.mutable.ListBuffer +import scala.collection.{Seq, mutable} object QueryResult { - def fromVertices(query: Query): Seq[QueryRequestWithResult] = { + def fromVertices(query: Query): StepInnerResult = { if (query.steps.isEmpty || query.steps.head.queryParams.isEmpty) { - Seq.empty + StepInnerResult.Empty } else { val queryParam = query.steps.head.queryParams.head val label = queryParam.label val currentTs = System.currentTimeMillis() val propsWithTs = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs)) - for { + val edgeWithScoreLs = for { vertex <- query.vertices } yield { - val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = propsWithTs) - val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore) - QueryRequestWithResult(QueryRequest(query, -1, vertex, queryParam), - QueryResult(edgeWithScoreLs = Seq(edgeWithScore))) - } + val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = propsWithTs) + val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore) + edgeWithScore + } + StepInnerResult(edgesWithScoreLs = edgeWithScoreLs, Nil, false) } } } -case class QueryRequestWithResult(queryRequest: QueryRequest, queryResult: QueryResult) +/** inner traverse */ +object StepInnerResult { + val Failure = StepInnerResult(Nil, Nil, true) + val Empty = StepInnerResult(Nil, Nil, false) +} +case class StepInnerResult(edgesWithScoreLs: Seq[EdgeWithScore], + degreeEdges: Seq[EdgeWithScore], + isFailure: Boolean = false) { + val isEmpty = edgesWithScoreLs.isEmpty +} case class QueryRequest(query: Query, stepIdx: Int, @@ -59,3 +70,206 @@ case class QueryResult(edgeWithScoreLs: Seq[EdgeWithScore] = Nil, isFailure: Boolean = false) case class EdgeWithScore(edge: Edge, score: Double) + + + + + +/** result */ + +object StepResult { + + type Values = Seq[S2EdgeWithScore] + type GroupByKey = Seq[Option[Any]] + val EmptyOrderByValues = (None, None, None, None) + val Empty = StepResult(Nil, Nil, Nil) + + + def mergeOrdered(left: StepResult.Values, + right: StepResult.Values, + queryOption: QueryOption): (Double, StepResult.Values) = { + val merged = (left ++ right) + val scoreSum = merged.foldLeft(0.0) { case (prev, current) => prev + current.score } + if (scoreSum < queryOption.scoreThreshold) (0.0, Nil) + else { + val ordered = orderBy(queryOption, merged) + val filtered = ordered.take(queryOption.groupBy.limit) + val newScoreSum = filtered.foldLeft(0.0) { case (prev, current) => prev + current.score } + (newScoreSum, filtered) + } + } + + def orderBy(queryOption: QueryOption, notOrdered: Values): Values = { + import OrderingUtil._ + + if (queryOption.withScore && queryOption.orderByColumns.nonEmpty) { + notOrdered.sortBy(_.orderByValues)(TupleMultiOrdering[Any](queryOption.ascendingVals)) + } else { + notOrdered + } + } + def toOrderByValues(s2Edge: Edge, + score: Double, + orderByKeys: Seq[String]): (Any, Any, Any, Any) = { + def toValue(propertyKey: String): Any = { + propertyKey match { + case "score" => score + case "timestamp" | "_timestamp" => s2Edge.ts + case _ => s2Edge.properties.get(propertyKey) + } + } + if (orderByKeys.isEmpty) (None, None, None, None) + else { + orderByKeys.length match { + case 1 => + (toValue(orderByKeys(0)), None, None, None) + case 2 => + (toValue(orderByKeys(0)), toValue(orderByKeys(1)), None, None) + case 3 => + (toValue(orderByKeys(0)), toValue(orderByKeys(1)), toValue(orderByKeys(2)), None) + case _ => + (toValue(orderByKeys(0)), toValue(orderByKeys(1)), toValue(orderByKeys(2)), toValue(orderByKeys(3))) + } + } + } + /** + * merge multiple StepResult into one StepResult. + * @param queryOption + * @param multiStepResults + * @return + */ + def merges(queryOption: QueryOption, + multiStepResults: Seq[StepResult], + weights: Seq[Double] = Nil): StepResult = { + val degrees = multiStepResults.flatMap(_.degreeEdges) + val ls = new mutable.ListBuffer[S2EdgeWithScore]() + val agg= new mutable.HashMap[GroupByKey, ListBuffer[S2EdgeWithScore]]() + val sums = new mutable.HashMap[GroupByKey, Double]() + + for { + (weight, eachStepResult) <- weights.zip(multiStepResults) + (ordered, grouped) = (eachStepResult.results, eachStepResult.grouped) + } { + ordered.foreach { t => + val newScore = t.score * weight + ls += t.copy(score = newScore) + } + + // process each query's stepResult's grouped + for { + (groupByKey, (scoreSum, values)) <- grouped + } { + val buffer = agg.getOrElseUpdate(groupByKey, ListBuffer.empty[S2EdgeWithScore]) + var scoreSum = 0.0 + values.foreach { t => + val newScore = t.score * weight + buffer += t.copy(score = newScore) + scoreSum += newScore + } + sums += (groupByKey -> scoreSum) + } + } + + // process global groupBy + if (queryOption.groupBy.keys.nonEmpty) { + for { + s2EdgeWithScore <- ls + groupByKey = s2EdgeWithScore.s2Edge.selectValues(queryOption.groupBy.keys) + } { + val buffer = agg.getOrElseUpdate(groupByKey, ListBuffer.empty[S2EdgeWithScore]) + buffer += s2EdgeWithScore + val newScore = sums.getOrElse(groupByKey, 0.0) + s2EdgeWithScore.score + sums += (groupByKey -> newScore) + } + } + + + val ordered = orderBy(queryOption, ls) + val grouped = for { + (groupByKey, scoreSum) <- sums.toSeq.sortBy(_._2 * -1) + aggregated = agg(groupByKey) if aggregated.nonEmpty + } yield groupByKey -> (scoreSum, aggregated) + + StepResult(results = ordered, grouped = grouped, degrees) + } + + //TODO: Optimize this. + def filterOut(graph: Graph, + queryOption: QueryOption, + baseStepResult: StepResult, + filterOutStepInnerResult: StepInnerResult): StepResult = { + + val fields = if (queryOption.filterOutFields.isEmpty) Seq("to") else Seq("to") + // else queryOption.filterOutFields + val filterOutSet = filterOutStepInnerResult.edgesWithScoreLs.map { t => + t.edge.selectValues(fields) + }.toSet + + val filteredResults = baseStepResult.results.filter { t => + val filterOutKey = t.s2Edge.selectValues(fields) + !filterOutSet.contains(filterOutKey) + } + + val grouped = for { + (key, (scoreSum, values)) <- baseStepResult.grouped + (out, in) = values.partition(v => filterOutSet.contains(v.s2Edge.selectValues(fields))) + newScoreSum = scoreSum - out.foldLeft(0.0) { case (prev, current) => prev + current.score } if in.nonEmpty + } yield key -> (newScoreSum, in) + + + StepResult(results = filteredResults, grouped = grouped, baseStepResult.degreeEdges) + } + def apply(graph: Graph, + queryOption: QueryOption, + stepInnerResult: StepInnerResult): StepResult = { + logger.debug(s"[BeforePostProcess]: ${stepInnerResult.edgesWithScoreLs.size}") + + val results = for { + edgeWithScore <- stepInnerResult.edgesWithScoreLs + } yield { + val edge = edgeWithScore.edge + val orderByValues = + if (queryOption.orderByColumns.isEmpty) (edgeWithScore.score, None, None, None) + else toOrderByValues(edge, edgeWithScore.score, queryOption.orderByKeys) + + S2EdgeWithScore(edge, edgeWithScore.score, orderByValues, edgeWithScore.edge.parentEdges) + } + /** ordered flatten result */ + val ordered = orderBy(queryOption, results) + + /** ordered grouped result */ + val grouped = + if (queryOption.groupBy.keys.isEmpty) Nil + else { + val agg = new mutable.HashMap[GroupByKey, (Double, Values)]() + results.groupBy { s2EdgeWithScore => + s2EdgeWithScore.s2Edge.selectValues(queryOption.groupBy.keys, useToString = true) + }.map { case (k, ls) => + val (scoreSum, merged) = mergeOrdered(ls, Nil, queryOption) + /** + * watch out here. by calling toString on Any, we lose type information which will be used + * later for toJson. + */ + if (merged.nonEmpty) { + val newKey = merged.head.s2Edge.selectValues(queryOption.groupBy.keys, useToString = false) + agg += (newKey -> (scoreSum, merged)) + } + } + agg.toSeq.sortBy(_._2._1 * -1) + } + + val degrees = stepInnerResult.degreeEdges.map(t => S2EdgeWithScore(t.edge, t.score)) + StepResult(results = ordered, grouped = grouped, degreeEdges = degrees) + } +} + +case class S2EdgeWithScore(s2Edge: Edge, + score: Double, + orderByValues: (Any, Any, Any, Any) = StepResult.EmptyOrderByValues, + parentEdges: Seq[EdgeWithScore] = Nil) + +case class StepResult(results: StepResult.Values, + grouped: Seq[(StepResult.GroupByKey, (Double, StepResult.Values))], + degreeEdges: StepResult.Values) { + val isEmpty = results.isEmpty +} \ No newline at end of file
