add TraversalHelper.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/7d082255 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/7d082255 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/7d082255 Branch: refs/heads/master Commit: 7d082255944228a34e8ea55d9609202ab65362b2 Parents: aa66822 Author: DO YUNG YOON <[email protected]> Authored: Sat Nov 4 07:19:27 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Sat Nov 4 07:19:27 2017 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/core/PostProcess.scala | 380 ---------------- .../scala/org/apache/s2graph/core/S2Graph.scala | 58 +-- .../apache/s2graph/core/TraversalHelper.scala | 442 +++++++++++++++++++ .../apache/s2graph/core/storage/StorageIO.scala | 2 +- 4 files changed, 450 insertions(+), 432 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7d082255/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala index 4549d84..2d2e183 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala @@ -281,384 +281,4 @@ object PostProcess { withOptionalFields(queryOption, results.size, degrees, results, stepResult.failCount, cursorJson, nextQuery) } } - - /** Global helper functions */ - @tailrec - final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = { - if (range < sampleNumber || set.size == sampleNumber) set - else randomInt(sampleNumber, range, set + Random.nextInt(range)) - } - - def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = { - if (edges.size <= n) { - edges - } else { - val plainEdges = if (queryRequest.queryParam.offset == 0) { - edges.tail - } else edges - - val randoms = randomInt(n, plainEdges.size) - var samples = List.empty[EdgeWithScore] - var idx = 0 - plainEdges.foreach { e => - if (randoms.contains(idx)) samples = e :: samples - idx += 1 - } - samples - } - } - - def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = { - val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score } - edgeWithScores.map { edgeWithScore => - edgeWithScore.copy(score = edgeWithScore.score / sum) - } - } - - def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, S2VertexLike), Boolean] = { - val vertices = for { - edgeWithScore <- edgeWithScoreLs - edge = edgeWithScore.edge - vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex - } yield (edge.labelWithDir, vertex) -> true - - vertices.toMap - } - - /** common methods for filter out, transform, aggregate queryResult */ - def convertEdges(queryParam: QueryParam, edge: S2EdgeLike, nextStepOpt: Option[Step]): Seq[S2EdgeLike] = { - for { - convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree - } yield convertedEdge - } - - def processTimeDecay(queryParam: QueryParam, edge: S2EdgeLike) = { - /* process time decay */ - val tsVal = queryParam.timeDecay match { - case None => 1.0 - case Some(timeDecay) => - val tsVal = try { - val innerValWithTsOpt = edge.propertyValue(timeDecay.labelMeta.name) - innerValWithTsOpt.map { innerValWithTs => - val innerVal = innerValWithTs.innerVal - timeDecay.labelMeta.dataType match { - case InnerVal.LONG => innerVal.value match { - case n: BigDecimal => n.bigDecimal.longValue() - case _ => innerVal.toString().toLong - } - case _ => innerVal.toString().toLong - } - } getOrElse (edge.ts) - } catch { - case e: Exception => - logger.error(s"processTimeDecay error. ${edge.toLogString}", e) - edge.ts - } - val timeDiff = queryParam.timestamp - tsVal - timeDecay.decay(timeDiff) - } - - tsVal - } - - def processDuplicates[R](queryParam: QueryParam, - duplicates: Seq[(FilterHashKey, R)])(implicit ev: WithScore[R]): Seq[(FilterHashKey, R)] = { - - if (queryParam.label.consistencyLevel != "strong") { - //TODO: - queryParam.duplicatePolicy match { - case DuplicatePolicy.First => Seq(duplicates.head) - case DuplicatePolicy.Raw => duplicates - case DuplicatePolicy.CountSum => - val countSum = duplicates.size - val (headFilterHashKey, headEdgeWithScore) = duplicates.head - Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, countSum)) - case _ => - val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + ev.score(current._2) } - val (headFilterHashKey, headEdgeWithScore) = duplicates.head - Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, scoreSum)) - } - } else { - duplicates - } - } - - def toHashKey(queryParam: QueryParam, edge: S2EdgeLike, isDegree: Boolean): (HashKey, FilterHashKey) = { - val src = edge.srcVertex.innerId.hashCode() - val tgt = edge.tgtVertex.innerId.hashCode() - val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree) - val filterHashKey = (src, tgt) - - (hashKey, filterHashKey) - } - - def filterEdges(q: Query, - stepIdx: Int, - queryRequests: Seq[QueryRequest], - queryResultLsFuture: Future[Seq[StepResult]], - queryParams: Seq[QueryParam], - alreadyVisited: Map[(LabelWithDirection, S2VertexLike), Boolean] = Map.empty, - buildLastStepInnerResult: Boolean = true, - parentEdges: Map[VertexId, Seq[EdgeWithScore]]) - (implicit ec: scala.concurrent.ExecutionContext): Future[StepResult] = { - - queryResultLsFuture.map { queryRequestWithResultLs => - val (cursors, failCount) = { - val _cursors = ArrayBuffer.empty[Array[Byte]] - var _failCount = 0 - - queryRequestWithResultLs.foreach { stepResult => - _cursors.append(stepResult.cursors: _*) - _failCount += stepResult.failCount - } - - _cursors -> _failCount - } - - - if (queryRequestWithResultLs.isEmpty) StepResult.Empty.copy(failCount = failCount) - else { - val isLastStep = stepIdx == q.steps.size - 1 - val queryOption = q.queryOption - val step = q.steps(stepIdx) - - val currentStepResults = queryRequests.view.zip(queryRequestWithResultLs) - val shouldBuildInnerResults = !isLastStep || buildLastStepInnerResult - val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges) - - if (shouldBuildInnerResults) { - val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) => - edgeWithScore - } - - /* process step group by */ - val results = StepResult.filterOutStepGroupBy(_results, step.groupBy) - StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount) - - } else { - val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) => - val edge = edgeWithScore.edge - val score = edgeWithScore.score - val label = edgeWithScore.label - - /* Select */ - val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns) - - // val newEdge = edge.copy(propsWithTs = mergedPropsWithTs) - val newEdge = edge.copyEdgeWithState(mergedPropsWithTs) - - val newEdgeWithScore = edgeWithScore.copy(edge = newEdge) - /* OrderBy */ - val orderByValues = - if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None) - else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys)) - - /* StepGroupBy */ - val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys) - - /* GroupBy */ - val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys) - - /* FilterOut */ - val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields) - - newEdgeWithScore.copy(orderByValues = orderByValues, - stepGroupByValues = stepGroupByValues, - groupByValues = groupByValues, - filterOutValues = filterOutValues) - } - - /* process step group by */ - val results = StepResult.filterOutStepGroupBy(_results, step.groupBy) - - /* process ordered list */ - val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil - - /* process grouped list */ - val grouped = - if (queryOption.groupBy.keys.isEmpty) Nil - else { - val agg = new scala.collection.mutable.HashMap[StepResult.GroupByKey, (Double, StepResult.Values)]() - results.groupBy { edgeWithScore => - // edgeWithScore.groupByValues.map(_.map(_.toString)) - edgeWithScore.groupByValues - }.foreach { case (k, ls) => - val (scoreSum, merged) = StepResult.mergeOrdered(ls, Nil, queryOption) - - val newScoreSum = scoreSum - - /* - * watch out here. by calling toString on Any, we lose type information which will be used - * later for toJson. - */ - if (merged.nonEmpty) { - val newKey = merged.head.groupByValues - agg += ((newKey, (newScoreSum, merged))) - } - } - agg.toSeq.sortBy(_._2._1 * -1) - } - - StepResult(edgeWithScores = ordered, grouped = grouped, degreeEdges = degrees, cursors = cursors, failCount = failCount) - } - } - } - } - - def toEdgeWithScores(queryRequest: QueryRequest, - stepResult: StepResult, - parentEdges: Map[VertexId, Seq[EdgeWithScore]]): Seq[EdgeWithScore] = { - val queryOption = queryRequest.query.queryOption - val queryParam = queryRequest.queryParam - val prevScore = queryRequest.prevStepScore - val labelWeight = queryRequest.labelWeight - val edgeWithScores = stepResult.edgeWithScores - - val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent - val parents = if (shouldBuildParents) { - parentEdges.getOrElse(queryRequest.vertex.id, Nil).map { edgeWithScore => - val edge = edgeWithScore.edge - val score = edgeWithScore.score - val label = edgeWithScore.label - - /* Select */ - val mergedPropsWithTs = - if (queryOption.selectColumns.isEmpty) { - edge.propertyValuesInner() - } else { - val initial = Map(LabelMeta.timestamp -> edge.propertyValueInner(LabelMeta.timestamp)) - edge.propertyValues(queryOption.selectColumns) ++ initial - } - - val newEdge = edge.copyEdgeWithState(mergedPropsWithTs) - edgeWithScore.copy(edge = newEdge) - } - } else Nil - - // skip - if (queryOption.ignorePrevStepCache) stepResult.edgeWithScores - else { - val degreeScore = 0.0 - - val sampled = - if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample) - else edgeWithScores - - val withScores = for { - edgeWithScore <- sampled - } yield { - val edge = edgeWithScore.edge - val edgeScore = edgeWithScore.score - val score = queryParam.scorePropagateOp match { - case "plus" => edgeScore + prevScore - case "divide" => - if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0 - else edgeScore / (prevScore + queryParam.scorePropagateShrinkage) - case _ => edgeScore * prevScore - } - - val tsVal = processTimeDecay(queryParam, edge) - val newScore = degreeScore + score - // val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge - val newEdge = edge.copyParentEdges(parents) - edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal) - } - - val normalized = - if (queryParam.shouldNormalize) normalize(withScores) - else withScores - - normalized - } - } - - def buildResult[R](query: Query, - stepIdx: Int, - stepResultLs: Seq[(QueryRequest, StepResult)], - parentEdges: Map[VertexId, Seq[EdgeWithScore]]) - (createFunc: (EdgeWithScore, Seq[LabelMeta]) => R) - (implicit ev: WithScore[R]): ListBuffer[R] = { - import scala.collection._ - - val results = ListBuffer.empty[R] - val sequentialLs: ListBuffer[(HashKey, FilterHashKey, R, QueryParam)] = ListBuffer.empty - val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, R)]] = mutable.HashMap.empty - val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty - val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty - - var numOfDuplicates = 0 - val queryOption = query.queryOption - val step = query.steps(stepIdx) - val excludeLabelWithDirSet = step.queryParams.filter(_.exclude).map(l => l.labelWithDir).toSet - val includeLabelWithDirSet = step.queryParams.filter(_.include).map(l => l.labelWithDir).toSet - - stepResultLs.foreach { case (queryRequest, stepInnerResult) => - val queryParam = queryRequest.queryParam - val label = queryParam.label - val shouldBeExcluded = excludeLabelWithDirSet.contains(queryParam.labelWithDir) - val shouldBeIncluded = includeLabelWithDirSet.contains(queryParam.labelWithDir) - - val propsSelectColumns = (for { - column <- queryOption.propsSelectColumns - labelMeta <- label.metaPropsInvMap.get(column) - } yield labelMeta) - - for { - edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges) - } { - val edge = edgeWithScore.edge - val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false) - // params += (hashKey -> queryParam) // - - /* check if this edge should be exlcuded. */ - if (shouldBeExcluded) { - edgesToExclude.add(filterHashKey) - } else { - if (shouldBeIncluded) { - edgesToInclude.add(filterHashKey) - } - val newEdgeWithScore = createFunc(edgeWithScore, propsSelectColumns) - - sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam)) - duplicates.get(hashKey) match { - case None => - val newLs = ListBuffer.empty[(FilterHashKey, R)] - newLs += (filterHashKey -> newEdgeWithScore) - duplicates += (hashKey -> newLs) // - case Some(old) => - numOfDuplicates += 1 - old += (filterHashKey -> newEdgeWithScore) // - } - } - } - } - - - if (numOfDuplicates == 0) { - // no duplicates at all. - for { - (hashKey, filterHashKey, edgeWithScore, _) <- sequentialLs - if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) - } { - results += edgeWithScore - } - } else { - // need to resolve duplicates. - val seen = new mutable.HashSet[HashKey]() - for { - (hashKey, filterHashKey, edgeWithScore, queryParam) <- sequentialLs - if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) - if !seen.contains(hashKey) - } { - // val queryParam = params(hashKey) - processDuplicates(queryParam, duplicates(hashKey)).foreach { case (_, duplicate) => - if (ev.score(duplicate) >= queryParam.threshold) { - seen += hashKey - results += duplicate - } - } - } - } - results - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7d082255/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index 5e23f9b..f061160 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -25,25 +25,15 @@ import java.util.concurrent.{Executors, TimeUnit} import com.typesafe.config.{Config, ConfigFactory} import org.apache.commons.configuration.{BaseConfiguration, Configuration} -import org.apache.s2graph.core.GraphExceptions.LabelNotExistException -import org.apache.s2graph.core.JSONParser._ -import org.apache.s2graph.core.features.S2GraphVariables import org.apache.s2graph.core.index.IndexProvider import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy import org.apache.s2graph.core.mysqls._ import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage -import org.apache.s2graph.core.storage.{MutateResponse, SKeyValue, Storage} +import org.apache.s2graph.core.storage.{MutateResponse, Storage} import org.apache.s2graph.core.types._ -import org.apache.s2graph.core.PostProcess._ import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger} -import org.apache.tinkerpop.gremlin.process.computer.GraphComputer import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies -import org.apache.tinkerpop.gremlin.structure -import org.apache.tinkerpop.gremlin.structure.Edge.Exceptions -import org.apache.tinkerpop.gremlin.structure.Graph.{Features, Variables} -import org.apache.tinkerpop.gremlin.structure.io.{GraphReader, GraphWriter, Io, Mapper} -import org.apache.tinkerpop.gremlin.structure.{Edge, Element, Graph, T, Transaction, Vertex} -import play.api.libs.json.{JsObject, Json} +import org.apache.tinkerpop.gremlin.structure.{Edge, Graph} import scala.collection.JavaConversions._ import scala.collection.mutable @@ -577,6 +567,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap val elementBuilder = new GraphElementBuilder(this) + val traversalHelper = new TraversalHelper(this) + def getStorage(service: Service): Storage = { storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage) } @@ -671,48 +663,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap buildLastStepInnerResult: Boolean = false): Future[StepResult] = { if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty) else { - val edgeWithScoreLs = stepInnerResult.edgeWithScores - - val q = orgQuery - val queryOption = orgQuery.queryOption - val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None - val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold) - val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1) - val step = q.steps(stepIdx) - - val alreadyVisited = - if (stepIdx == 0) Map.empty[(LabelWithDirection, S2VertexLike), Boolean] - else alreadyVisitedVertices(stepInnerResult.edgeWithScores) - - val initial = (Map.empty[S2VertexLike, Double], Map.empty[S2VertexLike, ArrayBuffer[EdgeWithScore]]) - val (sums, grouped) = edgeWithScoreLs.foldLeft(initial) { case ((sum, group), edgeWithScore) => - val key = edgeWithScore.edge.tgtVertex - val newScore = sum.getOrElse(key, 0.0) + edgeWithScore.score - val buffer = group.getOrElse(key, ArrayBuffer.empty[EdgeWithScore]) - buffer += edgeWithScore - (sum + (key -> newScore), group + (key -> buffer)) - } - val groupedByFiltered = sums.filter(t => t._2 >= prevStepThreshold) - val prevStepTgtVertexIdEdges = grouped.map(t => t._1.id -> t._2) - - val nextStepSrcVertices = if (prevStepLimit >= 0) { - groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit) - } else { - groupedByFiltered.toSeq - } - - val queryRequests = for { - (vertex, prevStepScore) <- nextStepSrcVertices - queryParam <- step.queryParams - } yield { - val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0) - val newPrevStepScore = if (queryOption.shouldPropagateScore) prevStepScore else 1.0 - QueryRequest(q, stepIdx, vertex, queryParam, newPrevStepScore, labelWeight) - } + val (alreadyVisited: Map[(LabelWithDirection, S2VertexLike), Boolean], prevStepTgtVertexIdEdges: Map[VertexId, ArrayBuffer[EdgeWithScore]], queryRequests: Seq[QueryRequest]) = + traversalHelper.buildNextStepQueryRequests(orgQuery, stepIdx, stepInnerResult) val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges) - filterEdges(orgQuery, stepIdx, queryRequests, + traversalHelper.filterEdges(orgQuery, stepIdx, queryRequests, fetchedLs, orgQuery.steps(stepIdx).queryParams, alreadyVisited, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7d082255/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala new file mode 100644 index 0000000..58da145 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala @@ -0,0 +1,442 @@ +package org.apache.s2graph.core + +import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey} +import org.apache.s2graph.core.mysqls.LabelMeta +import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection, VertexId} +import org.apache.s2graph.core.utils.logger + +import scala.annotation.tailrec +import scala.collection.mutable.{ArrayBuffer, ListBuffer} +import scala.concurrent.Future +import scala.util.Random + +object TraversalHelper { + @tailrec + final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = { + if (range < sampleNumber || set.size == sampleNumber) set + else randomInt(sampleNumber, range, set + Random.nextInt(range)) + } + + def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = { + if (edges.size <= n) { + edges + } else { + val plainEdges = if (queryRequest.queryParam.offset == 0) { + edges.tail + } else edges + + val randoms = randomInt(n, plainEdges.size) + var samples = List.empty[EdgeWithScore] + var idx = 0 + plainEdges.foreach { e => + if (randoms.contains(idx)) samples = e :: samples + idx += 1 + } + samples + } + } + + def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = { + val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score } + edgeWithScores.map { edgeWithScore => + edgeWithScore.copy(score = edgeWithScore.score / sum) + } + } + + def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, S2VertexLike), Boolean] = { + val vertices = for { + edgeWithScore <- edgeWithScoreLs + edge = edgeWithScore.edge + vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex + } yield (edge.labelWithDir, vertex) -> true + + vertices.toMap + } + + /** common methods for filter out, transform, aggregate queryResult */ + def convertEdges(queryParam: QueryParam, edge: S2EdgeLike, nextStepOpt: Option[Step]): Seq[S2EdgeLike] = { + for { + convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree + } yield convertedEdge + } + + def processTimeDecay(queryParam: QueryParam, edge: S2EdgeLike) = { + /* process time decay */ + val tsVal = queryParam.timeDecay match { + case None => 1.0 + case Some(timeDecay) => + val tsVal = try { + val innerValWithTsOpt = edge.propertyValue(timeDecay.labelMeta.name) + innerValWithTsOpt.map { innerValWithTs => + val innerVal = innerValWithTs.innerVal + timeDecay.labelMeta.dataType match { + case InnerVal.LONG => innerVal.value match { + case n: BigDecimal => n.bigDecimal.longValue() + case _ => innerVal.toString().toLong + } + case _ => innerVal.toString().toLong + } + } getOrElse (edge.ts) + } catch { + case e: Exception => + logger.error(s"processTimeDecay error. ${edge.toLogString}", e) + edge.ts + } + val timeDiff = queryParam.timestamp - tsVal + timeDecay.decay(timeDiff) + } + + tsVal + } + + def processDuplicates[R](queryParam: QueryParam, + duplicates: Seq[(FilterHashKey, R)])(implicit ev: WithScore[R]): Seq[(FilterHashKey, R)] = { + + if (queryParam.label.consistencyLevel != "strong") { + //TODO: + queryParam.duplicatePolicy match { + case DuplicatePolicy.First => Seq(duplicates.head) + case DuplicatePolicy.Raw => duplicates + case DuplicatePolicy.CountSum => + val countSum = duplicates.size + val (headFilterHashKey, headEdgeWithScore) = duplicates.head + Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, countSum)) + case _ => + val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + ev.score(current._2) } + val (headFilterHashKey, headEdgeWithScore) = duplicates.head + Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, scoreSum)) + } + } else { + duplicates + } + } + + def toHashKey(queryParam: QueryParam, edge: S2EdgeLike, isDegree: Boolean): (HashKey, FilterHashKey) = { + val src = edge.srcVertex.innerId.hashCode() + val tgt = edge.tgtVertex.innerId.hashCode() + val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree) + val filterHashKey = (src, tgt) + + (hashKey, filterHashKey) + } +} + + +class TraversalHelper(graph: S2GraphLike) { + import TraversalHelper._ + + def buildNextStepQueryRequests(orgQuery: Query, stepIdx: Int, stepInnerResult: StepResult) = { + val edgeWithScoreLs = stepInnerResult.edgeWithScores + + val q = orgQuery + val queryOption = orgQuery.queryOption + val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None + val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold) + val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1) + val step = q.steps(stepIdx) + + val alreadyVisited = + if (stepIdx == 0) Map.empty[(LabelWithDirection, S2VertexLike), Boolean] + else alreadyVisitedVertices(stepInnerResult.edgeWithScores) + + val initial = (Map.empty[S2VertexLike, Double], Map.empty[S2VertexLike, ArrayBuffer[EdgeWithScore]]) + val (sums, grouped) = edgeWithScoreLs.foldLeft(initial) { case ((sum, group), edgeWithScore) => + val key = edgeWithScore.edge.tgtVertex + val newScore = sum.getOrElse(key, 0.0) + edgeWithScore.score + val buffer = group.getOrElse(key, ArrayBuffer.empty[EdgeWithScore]) + buffer += edgeWithScore + (sum + (key -> newScore), group + (key -> buffer)) + } + val groupedByFiltered = sums.filter(t => t._2 >= prevStepThreshold) + val prevStepTgtVertexIdEdges = grouped.map(t => t._1.id -> t._2) + + val nextStepSrcVertices = if (prevStepLimit >= 0) { + groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit) + } else { + groupedByFiltered.toSeq + } + + val queryRequests = for { + (vertex, prevStepScore) <- nextStepSrcVertices + queryParam <- step.queryParams + } yield { + val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0) + val newPrevStepScore = if (queryOption.shouldPropagateScore) prevStepScore else 1.0 + QueryRequest(q, stepIdx, vertex, queryParam, newPrevStepScore, labelWeight) + } + (alreadyVisited, prevStepTgtVertexIdEdges, queryRequests) + } + + def filterEdges(q: Query, + stepIdx: Int, + queryRequests: Seq[QueryRequest], + queryResultLsFuture: Future[Seq[StepResult]], + queryParams: Seq[QueryParam], + alreadyVisited: Map[(LabelWithDirection, S2VertexLike), Boolean] = Map.empty, + buildLastStepInnerResult: Boolean = true, + parentEdges: Map[VertexId, Seq[EdgeWithScore]]) + (implicit ec: scala.concurrent.ExecutionContext): Future[StepResult] = { + + queryResultLsFuture.map { queryRequestWithResultLs => + val (cursors, failCount) = { + val _cursors = ArrayBuffer.empty[Array[Byte]] + var _failCount = 0 + + queryRequestWithResultLs.foreach { stepResult => + _cursors.append(stepResult.cursors: _*) + _failCount += stepResult.failCount + } + + _cursors -> _failCount + } + + + if (queryRequestWithResultLs.isEmpty) StepResult.Empty.copy(failCount = failCount) + else { + val isLastStep = stepIdx == q.steps.size - 1 + val queryOption = q.queryOption + val step = q.steps(stepIdx) + + val currentStepResults = queryRequests.view.zip(queryRequestWithResultLs) + val shouldBuildInnerResults = !isLastStep || buildLastStepInnerResult + val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges) + + if (shouldBuildInnerResults) { + val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) => + edgeWithScore + } + + /* process step group by */ + val results = StepResult.filterOutStepGroupBy(_results, step.groupBy) + StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount) + + } else { + val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) => + val edge = edgeWithScore.edge + val score = edgeWithScore.score + val label = edgeWithScore.label + + /* Select */ + val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns) + + // val newEdge = edge.copy(propsWithTs = mergedPropsWithTs) + val newEdge = edge.copyEdgeWithState(mergedPropsWithTs) + + val newEdgeWithScore = edgeWithScore.copy(edge = newEdge) + /* OrderBy */ + val orderByValues = + if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None) + else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys)) + + /* StepGroupBy */ + val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys) + + /* GroupBy */ + val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys) + + /* FilterOut */ + val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields) + + newEdgeWithScore.copy(orderByValues = orderByValues, + stepGroupByValues = stepGroupByValues, + groupByValues = groupByValues, + filterOutValues = filterOutValues) + } + + /* process step group by */ + val results = StepResult.filterOutStepGroupBy(_results, step.groupBy) + + /* process ordered list */ + val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil + + /* process grouped list */ + val grouped = + if (queryOption.groupBy.keys.isEmpty) Nil + else { + val agg = new scala.collection.mutable.HashMap[StepResult.GroupByKey, (Double, StepResult.Values)]() + results.groupBy { edgeWithScore => + // edgeWithScore.groupByValues.map(_.map(_.toString)) + edgeWithScore.groupByValues + }.foreach { case (k, ls) => + val (scoreSum, merged) = StepResult.mergeOrdered(ls, Nil, queryOption) + + val newScoreSum = scoreSum + + /* + * watch out here. by calling toString on Any, we lose type information which will be used + * later for toJson. + */ + if (merged.nonEmpty) { + val newKey = merged.head.groupByValues + agg += ((newKey, (newScoreSum, merged))) + } + } + agg.toSeq.sortBy(_._2._1 * -1) + } + + StepResult(edgeWithScores = ordered, grouped = grouped, degreeEdges = degrees, cursors = cursors, failCount = failCount) + } + } + } + } + + + + + private def toEdgeWithScores(queryRequest: QueryRequest, + stepResult: StepResult, + parentEdges: Map[VertexId, Seq[EdgeWithScore]]): Seq[EdgeWithScore] = { + val queryOption = queryRequest.query.queryOption + val queryParam = queryRequest.queryParam + val prevScore = queryRequest.prevStepScore + val labelWeight = queryRequest.labelWeight + val edgeWithScores = stepResult.edgeWithScores + + val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent + val parents = if (shouldBuildParents) { + parentEdges.getOrElse(queryRequest.vertex.id, Nil).map { edgeWithScore => + val edge = edgeWithScore.edge + val score = edgeWithScore.score + val label = edgeWithScore.label + + /* Select */ + val mergedPropsWithTs = + if (queryOption.selectColumns.isEmpty) { + edge.propertyValuesInner() + } else { + val initial = Map(LabelMeta.timestamp -> edge.propertyValueInner(LabelMeta.timestamp)) + edge.propertyValues(queryOption.selectColumns) ++ initial + } + + val newEdge = edge.copyEdgeWithState(mergedPropsWithTs) + edgeWithScore.copy(edge = newEdge) + } + } else Nil + + // skip + if (queryOption.ignorePrevStepCache) stepResult.edgeWithScores + else { + val degreeScore = 0.0 + + val sampled = + if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample) + else edgeWithScores + + val withScores = for { + edgeWithScore <- sampled + } yield { + val edge = edgeWithScore.edge + val edgeScore = edgeWithScore.score + val score = queryParam.scorePropagateOp match { + case "plus" => edgeScore + prevScore + case "divide" => + if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0 + else edgeScore / (prevScore + queryParam.scorePropagateShrinkage) + case _ => edgeScore * prevScore + } + + val tsVal = processTimeDecay(queryParam, edge) + val newScore = degreeScore + score + // val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge + val newEdge = edge.copyParentEdges(parents) + edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal) + } + + val normalized = + if (queryParam.shouldNormalize) normalize(withScores) + else withScores + + normalized + } + } + + private def buildResult[R](query: Query, + stepIdx: Int, + stepResultLs: Seq[(QueryRequest, StepResult)], + parentEdges: Map[VertexId, Seq[EdgeWithScore]]) + (createFunc: (EdgeWithScore, Seq[LabelMeta]) => R) + (implicit ev: WithScore[R]): ListBuffer[R] = { + import scala.collection._ + + val results = ListBuffer.empty[R] + val sequentialLs: ListBuffer[(HashKey, FilterHashKey, R, QueryParam)] = ListBuffer.empty + val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, R)]] = mutable.HashMap.empty + val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty + val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty + + var numOfDuplicates = 0 + val queryOption = query.queryOption + val step = query.steps(stepIdx) + val excludeLabelWithDirSet = step.queryParams.filter(_.exclude).map(l => l.labelWithDir).toSet + val includeLabelWithDirSet = step.queryParams.filter(_.include).map(l => l.labelWithDir).toSet + + stepResultLs.foreach { case (queryRequest, stepInnerResult) => + val queryParam = queryRequest.queryParam + val label = queryParam.label + val shouldBeExcluded = excludeLabelWithDirSet.contains(queryParam.labelWithDir) + val shouldBeIncluded = includeLabelWithDirSet.contains(queryParam.labelWithDir) + + val propsSelectColumns = (for { + column <- queryOption.propsSelectColumns + labelMeta <- label.metaPropsInvMap.get(column) + } yield labelMeta) + + for { + edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges) + } { + val edge = edgeWithScore.edge + val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false) + // params += (hashKey -> queryParam) // + + /* check if this edge should be exlcuded. */ + if (shouldBeExcluded) { + edgesToExclude.add(filterHashKey) + } else { + if (shouldBeIncluded) { + edgesToInclude.add(filterHashKey) + } + val newEdgeWithScore = createFunc(edgeWithScore, propsSelectColumns) + + sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam)) + duplicates.get(hashKey) match { + case None => + val newLs = ListBuffer.empty[(FilterHashKey, R)] + newLs += (filterHashKey -> newEdgeWithScore) + duplicates += (hashKey -> newLs) // + case Some(old) => + numOfDuplicates += 1 + old += (filterHashKey -> newEdgeWithScore) // + } + } + } + } + + + if (numOfDuplicates == 0) { + // no duplicates at all. + for { + (hashKey, filterHashKey, edgeWithScore, _) <- sequentialLs + if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) + } { + results += edgeWithScore + } + } else { + // need to resolve duplicates. + val seen = new mutable.HashSet[HashKey]() + for { + (hashKey, filterHashKey, edgeWithScore, queryParam) <- sequentialLs + if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) + if !seen.contains(hashKey) + } { + // val queryParam = params(hashKey) + processDuplicates(queryParam, duplicates(hashKey)).foreach { case (_, duplicate) => + if (ev.score(duplicate) >= queryParam.threshold) { + seen += hashKey + results += duplicate + } + } + } + } + results + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7d082255/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala index 3074b4e..28b15d0 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core.storage import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.PostProcess._ +import org.apache.s2graph.core.TraversalHelper._ import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.LabelMeta import org.apache.s2graph.core.parsers.WhereParser
