move S2Graph global helpers into PostProcess.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/42b7702e Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/42b7702e Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/42b7702e Branch: refs/heads/master Commit: 42b7702e5d32e32b8888067d21031b9eb13ce1c1 Parents: cdfa0c3 Author: DO YUNG YOON <[email protected]> Authored: Fri Nov 3 20:48:07 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Fri Nov 3 20:48:07 2017 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/core/PostProcess.scala | 386 +++++++++++++++++- .../scala/org/apache/s2graph/core/S2Graph.scala | 390 +------------------ .../apache/s2graph/core/storage/StorageIO.scala | 2 +- 3 files changed, 392 insertions(+), 386 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/42b7702e/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala index 3017749..4549d84 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala @@ -24,15 +24,19 @@ import java.util.Base64 import com.google.protobuf.ByteString import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException} import org.apache.s2graph.core.mysqls._ -import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs} +import org.apache.s2graph.core.types._ import org.apache.s2graph.core.JSONParser._ +import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey} import org.apache.s2graph.core.rest.RequestParser import org.apache.s2graph.core.utils.logger import play.api.libs.json.{Json, _} +import scala.annotation.tailrec import scala.collection.JavaConversions._ import scala.collection.immutable import scala.collection.mutable.{ArrayBuffer, ListBuffer} +import scala.concurrent.Future +import scala.util.Random object PostProcess { @@ -277,4 +281,384 @@ object PostProcess { withOptionalFields(queryOption, results.size, degrees, results, stepResult.failCount, cursorJson, nextQuery) } } + + /** Global helper functions */ + @tailrec + final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = { + if (range < sampleNumber || set.size == sampleNumber) set + else randomInt(sampleNumber, range, set + Random.nextInt(range)) + } + + def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = { + if (edges.size <= n) { + edges + } else { + val plainEdges = if (queryRequest.queryParam.offset == 0) { + edges.tail + } else edges + + val randoms = randomInt(n, plainEdges.size) + var samples = List.empty[EdgeWithScore] + var idx = 0 + plainEdges.foreach { e => + if (randoms.contains(idx)) samples = e :: samples + idx += 1 + } + samples + } + } + + def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = { + val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score } + edgeWithScores.map { edgeWithScore => + edgeWithScore.copy(score = edgeWithScore.score / sum) + } + } + + def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, S2VertexLike), Boolean] = { + val vertices = for { + edgeWithScore <- edgeWithScoreLs + edge = edgeWithScore.edge + vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex + } yield (edge.labelWithDir, vertex) -> true + + vertices.toMap + } + + /** common methods for filter out, transform, aggregate queryResult */ + def convertEdges(queryParam: QueryParam, edge: S2EdgeLike, nextStepOpt: Option[Step]): Seq[S2EdgeLike] = { + for { + convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree + } yield convertedEdge + } + + def processTimeDecay(queryParam: QueryParam, edge: S2EdgeLike) = { + /* process time decay */ + val tsVal = queryParam.timeDecay match { + case None => 1.0 + case Some(timeDecay) => + val tsVal = try { + val innerValWithTsOpt = edge.propertyValue(timeDecay.labelMeta.name) + innerValWithTsOpt.map { innerValWithTs => + val innerVal = innerValWithTs.innerVal + timeDecay.labelMeta.dataType match { + case InnerVal.LONG => innerVal.value match { + case n: BigDecimal => n.bigDecimal.longValue() + case _ => innerVal.toString().toLong + } + case _ => innerVal.toString().toLong + } + } getOrElse (edge.ts) + } catch { + case e: Exception => + logger.error(s"processTimeDecay error. ${edge.toLogString}", e) + edge.ts + } + val timeDiff = queryParam.timestamp - tsVal + timeDecay.decay(timeDiff) + } + + tsVal + } + + def processDuplicates[R](queryParam: QueryParam, + duplicates: Seq[(FilterHashKey, R)])(implicit ev: WithScore[R]): Seq[(FilterHashKey, R)] = { + + if (queryParam.label.consistencyLevel != "strong") { + //TODO: + queryParam.duplicatePolicy match { + case DuplicatePolicy.First => Seq(duplicates.head) + case DuplicatePolicy.Raw => duplicates + case DuplicatePolicy.CountSum => + val countSum = duplicates.size + val (headFilterHashKey, headEdgeWithScore) = duplicates.head + Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, countSum)) + case _ => + val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + ev.score(current._2) } + val (headFilterHashKey, headEdgeWithScore) = duplicates.head + Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, scoreSum)) + } + } else { + duplicates + } + } + + def toHashKey(queryParam: QueryParam, edge: S2EdgeLike, isDegree: Boolean): (HashKey, FilterHashKey) = { + val src = edge.srcVertex.innerId.hashCode() + val tgt = edge.tgtVertex.innerId.hashCode() + val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree) + val filterHashKey = (src, tgt) + + (hashKey, filterHashKey) + } + + def filterEdges(q: Query, + stepIdx: Int, + queryRequests: Seq[QueryRequest], + queryResultLsFuture: Future[Seq[StepResult]], + queryParams: Seq[QueryParam], + alreadyVisited: Map[(LabelWithDirection, S2VertexLike), Boolean] = Map.empty, + buildLastStepInnerResult: Boolean = true, + parentEdges: Map[VertexId, Seq[EdgeWithScore]]) + (implicit ec: scala.concurrent.ExecutionContext): Future[StepResult] = { + + queryResultLsFuture.map { queryRequestWithResultLs => + val (cursors, failCount) = { + val _cursors = ArrayBuffer.empty[Array[Byte]] + var _failCount = 0 + + queryRequestWithResultLs.foreach { stepResult => + _cursors.append(stepResult.cursors: _*) + _failCount += stepResult.failCount + } + + _cursors -> _failCount + } + + + if (queryRequestWithResultLs.isEmpty) StepResult.Empty.copy(failCount = failCount) + else { + val isLastStep = stepIdx == q.steps.size - 1 + val queryOption = q.queryOption + val step = q.steps(stepIdx) + + val currentStepResults = queryRequests.view.zip(queryRequestWithResultLs) + val shouldBuildInnerResults = !isLastStep || buildLastStepInnerResult + val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges) + + if (shouldBuildInnerResults) { + val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) => + edgeWithScore + } + + /* process step group by */ + val results = StepResult.filterOutStepGroupBy(_results, step.groupBy) + StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount) + + } else { + val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) => + val edge = edgeWithScore.edge + val score = edgeWithScore.score + val label = edgeWithScore.label + + /* Select */ + val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns) + + // val newEdge = edge.copy(propsWithTs = mergedPropsWithTs) + val newEdge = edge.copyEdgeWithState(mergedPropsWithTs) + + val newEdgeWithScore = edgeWithScore.copy(edge = newEdge) + /* OrderBy */ + val orderByValues = + if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None) + else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys)) + + /* StepGroupBy */ + val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys) + + /* GroupBy */ + val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys) + + /* FilterOut */ + val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields) + + newEdgeWithScore.copy(orderByValues = orderByValues, + stepGroupByValues = stepGroupByValues, + groupByValues = groupByValues, + filterOutValues = filterOutValues) + } + + /* process step group by */ + val results = StepResult.filterOutStepGroupBy(_results, step.groupBy) + + /* process ordered list */ + val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil + + /* process grouped list */ + val grouped = + if (queryOption.groupBy.keys.isEmpty) Nil + else { + val agg = new scala.collection.mutable.HashMap[StepResult.GroupByKey, (Double, StepResult.Values)]() + results.groupBy { edgeWithScore => + // edgeWithScore.groupByValues.map(_.map(_.toString)) + edgeWithScore.groupByValues + }.foreach { case (k, ls) => + val (scoreSum, merged) = StepResult.mergeOrdered(ls, Nil, queryOption) + + val newScoreSum = scoreSum + + /* + * watch out here. by calling toString on Any, we lose type information which will be used + * later for toJson. + */ + if (merged.nonEmpty) { + val newKey = merged.head.groupByValues + agg += ((newKey, (newScoreSum, merged))) + } + } + agg.toSeq.sortBy(_._2._1 * -1) + } + + StepResult(edgeWithScores = ordered, grouped = grouped, degreeEdges = degrees, cursors = cursors, failCount = failCount) + } + } + } + } + + def toEdgeWithScores(queryRequest: QueryRequest, + stepResult: StepResult, + parentEdges: Map[VertexId, Seq[EdgeWithScore]]): Seq[EdgeWithScore] = { + val queryOption = queryRequest.query.queryOption + val queryParam = queryRequest.queryParam + val prevScore = queryRequest.prevStepScore + val labelWeight = queryRequest.labelWeight + val edgeWithScores = stepResult.edgeWithScores + + val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent + val parents = if (shouldBuildParents) { + parentEdges.getOrElse(queryRequest.vertex.id, Nil).map { edgeWithScore => + val edge = edgeWithScore.edge + val score = edgeWithScore.score + val label = edgeWithScore.label + + /* Select */ + val mergedPropsWithTs = + if (queryOption.selectColumns.isEmpty) { + edge.propertyValuesInner() + } else { + val initial = Map(LabelMeta.timestamp -> edge.propertyValueInner(LabelMeta.timestamp)) + edge.propertyValues(queryOption.selectColumns) ++ initial + } + + val newEdge = edge.copyEdgeWithState(mergedPropsWithTs) + edgeWithScore.copy(edge = newEdge) + } + } else Nil + + // skip + if (queryOption.ignorePrevStepCache) stepResult.edgeWithScores + else { + val degreeScore = 0.0 + + val sampled = + if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample) + else edgeWithScores + + val withScores = for { + edgeWithScore <- sampled + } yield { + val edge = edgeWithScore.edge + val edgeScore = edgeWithScore.score + val score = queryParam.scorePropagateOp match { + case "plus" => edgeScore + prevScore + case "divide" => + if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0 + else edgeScore / (prevScore + queryParam.scorePropagateShrinkage) + case _ => edgeScore * prevScore + } + + val tsVal = processTimeDecay(queryParam, edge) + val newScore = degreeScore + score + // val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge + val newEdge = edge.copyParentEdges(parents) + edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal) + } + + val normalized = + if (queryParam.shouldNormalize) normalize(withScores) + else withScores + + normalized + } + } + + def buildResult[R](query: Query, + stepIdx: Int, + stepResultLs: Seq[(QueryRequest, StepResult)], + parentEdges: Map[VertexId, Seq[EdgeWithScore]]) + (createFunc: (EdgeWithScore, Seq[LabelMeta]) => R) + (implicit ev: WithScore[R]): ListBuffer[R] = { + import scala.collection._ + + val results = ListBuffer.empty[R] + val sequentialLs: ListBuffer[(HashKey, FilterHashKey, R, QueryParam)] = ListBuffer.empty + val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, R)]] = mutable.HashMap.empty + val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty + val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty + + var numOfDuplicates = 0 + val queryOption = query.queryOption + val step = query.steps(stepIdx) + val excludeLabelWithDirSet = step.queryParams.filter(_.exclude).map(l => l.labelWithDir).toSet + val includeLabelWithDirSet = step.queryParams.filter(_.include).map(l => l.labelWithDir).toSet + + stepResultLs.foreach { case (queryRequest, stepInnerResult) => + val queryParam = queryRequest.queryParam + val label = queryParam.label + val shouldBeExcluded = excludeLabelWithDirSet.contains(queryParam.labelWithDir) + val shouldBeIncluded = includeLabelWithDirSet.contains(queryParam.labelWithDir) + + val propsSelectColumns = (for { + column <- queryOption.propsSelectColumns + labelMeta <- label.metaPropsInvMap.get(column) + } yield labelMeta) + + for { + edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges) + } { + val edge = edgeWithScore.edge + val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false) + // params += (hashKey -> queryParam) // + + /* check if this edge should be exlcuded. */ + if (shouldBeExcluded) { + edgesToExclude.add(filterHashKey) + } else { + if (shouldBeIncluded) { + edgesToInclude.add(filterHashKey) + } + val newEdgeWithScore = createFunc(edgeWithScore, propsSelectColumns) + + sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam)) + duplicates.get(hashKey) match { + case None => + val newLs = ListBuffer.empty[(FilterHashKey, R)] + newLs += (filterHashKey -> newEdgeWithScore) + duplicates += (hashKey -> newLs) // + case Some(old) => + numOfDuplicates += 1 + old += (filterHashKey -> newEdgeWithScore) // + } + } + } + } + + + if (numOfDuplicates == 0) { + // no duplicates at all. + for { + (hashKey, filterHashKey, edgeWithScore, _) <- sequentialLs + if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) + } { + results += edgeWithScore + } + } else { + // need to resolve duplicates. + val seen = new mutable.HashSet[HashKey]() + for { + (hashKey, filterHashKey, edgeWithScore, queryParam) <- sequentialLs + if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) + if !seen.contains(hashKey) + } { + // val queryParam = params(hashKey) + processDuplicates(queryParam, duplicates(hashKey)).foreach { case (_, duplicate) => + if (ev.score(duplicate) >= queryParam.threshold) { + seen += hashKey + results += duplicate + } + } + } + } + results + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/42b7702e/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index 8bb95fc..82d0c6a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -22,6 +22,7 @@ package org.apache.s2graph.core import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import java.util.concurrent.{Executors, TimeUnit} + import com.typesafe.config.{Config, ConfigFactory} import org.apache.commons.configuration.{BaseConfiguration, Configuration} import org.apache.s2graph.core.GraphExceptions.LabelNotExistException @@ -31,8 +32,9 @@ import org.apache.s2graph.core.index.IndexProvider import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy import org.apache.s2graph.core.mysqls._ import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage -import org.apache.s2graph.core.storage.{ MutateResponse, SKeyValue, Storage} +import org.apache.s2graph.core.storage.{MutateResponse, SKeyValue, Storage} import org.apache.s2graph.core.types._ +import org.apache.s2graph.core.PostProcess._ import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger} import org.apache.tinkerpop.gremlin.process.computer.GraphComputer import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies @@ -42,13 +44,13 @@ import org.apache.tinkerpop.gremlin.structure.Graph.{Features, Variables} import org.apache.tinkerpop.gremlin.structure.io.{GraphReader, GraphWriter, Io, Mapper} import org.apache.tinkerpop.gremlin.structure.{Edge, Element, Graph, T, Transaction, Vertex} import play.api.libs.json.{JsObject, Json} -import scala.annotation.tailrec + import scala.collection.JavaConversions._ import scala.collection.mutable -import scala.collection.mutable.{ArrayBuffer, ListBuffer} +import scala.collection.mutable.ArrayBuffer import scala.concurrent._ import scala.concurrent.duration.Duration -import scala.util.{Random, Try} +import scala.util.Try object S2Graph { @@ -156,386 +158,6 @@ object S2Graph { } ConfigFactory.parseMap(kvs) } - - /** Global helper functions */ - @tailrec - final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = { - if (range < sampleNumber || set.size == sampleNumber) set - else randomInt(sampleNumber, range, set + Random.nextInt(range)) - } - - def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = { - if (edges.size <= n) { - edges - } else { - val plainEdges = if (queryRequest.queryParam.offset == 0) { - edges.tail - } else edges - - val randoms = randomInt(n, plainEdges.size) - var samples = List.empty[EdgeWithScore] - var idx = 0 - plainEdges.foreach { e => - if (randoms.contains(idx)) samples = e :: samples - idx += 1 - } - samples - } - } - - def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = { - val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score } - edgeWithScores.map { edgeWithScore => - edgeWithScore.copy(score = edgeWithScore.score / sum) - } - } - - def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, S2VertexLike), Boolean] = { - val vertices = for { - edgeWithScore <- edgeWithScoreLs - edge = edgeWithScore.edge - vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex - } yield (edge.labelWithDir, vertex) -> true - - vertices.toMap - } - - /** common methods for filter out, transform, aggregate queryResult */ - def convertEdges(queryParam: QueryParam, edge: S2EdgeLike, nextStepOpt: Option[Step]): Seq[S2EdgeLike] = { - for { - convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree - } yield convertedEdge - } - - def processTimeDecay(queryParam: QueryParam, edge: S2EdgeLike) = { - /* process time decay */ - val tsVal = queryParam.timeDecay match { - case None => 1.0 - case Some(timeDecay) => - val tsVal = try { - val innerValWithTsOpt = edge.propertyValue(timeDecay.labelMeta.name) - innerValWithTsOpt.map { innerValWithTs => - val innerVal = innerValWithTs.innerVal - timeDecay.labelMeta.dataType match { - case InnerVal.LONG => innerVal.value match { - case n: BigDecimal => n.bigDecimal.longValue() - case _ => innerVal.toString().toLong - } - case _ => innerVal.toString().toLong - } - } getOrElse (edge.ts) - } catch { - case e: Exception => - logger.error(s"processTimeDecay error. ${edge.toLogString}", e) - edge.ts - } - val timeDiff = queryParam.timestamp - tsVal - timeDecay.decay(timeDiff) - } - - tsVal - } - - def processDuplicates[R](queryParam: QueryParam, - duplicates: Seq[(FilterHashKey, R)])(implicit ev: WithScore[R]): Seq[(FilterHashKey, R)] = { - - if (queryParam.label.consistencyLevel != "strong") { - //TODO: - queryParam.duplicatePolicy match { - case DuplicatePolicy.First => Seq(duplicates.head) - case DuplicatePolicy.Raw => duplicates - case DuplicatePolicy.CountSum => - val countSum = duplicates.size - val (headFilterHashKey, headEdgeWithScore) = duplicates.head - Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, countSum)) - case _ => - val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + ev.score(current._2) } - val (headFilterHashKey, headEdgeWithScore) = duplicates.head - Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, scoreSum)) - } - } else { - duplicates - } - } - - def toHashKey(queryParam: QueryParam, edge: S2EdgeLike, isDegree: Boolean): (HashKey, FilterHashKey) = { - val src = edge.srcVertex.innerId.hashCode() - val tgt = edge.tgtVertex.innerId.hashCode() - val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree) - val filterHashKey = (src, tgt) - - (hashKey, filterHashKey) - } - - def filterEdges(q: Query, - stepIdx: Int, - queryRequests: Seq[QueryRequest], - queryResultLsFuture: Future[Seq[StepResult]], - queryParams: Seq[QueryParam], - alreadyVisited: Map[(LabelWithDirection, S2VertexLike), Boolean] = Map.empty, - buildLastStepInnerResult: Boolean = true, - parentEdges: Map[VertexId, Seq[EdgeWithScore]]) - (implicit ec: scala.concurrent.ExecutionContext): Future[StepResult] = { - - queryResultLsFuture.map { queryRequestWithResultLs => - val (cursors, failCount) = { - val _cursors = ArrayBuffer.empty[Array[Byte]] - var _failCount = 0 - - queryRequestWithResultLs.foreach { stepResult => - _cursors.append(stepResult.cursors: _*) - _failCount += stepResult.failCount - } - - _cursors -> _failCount - } - - - if (queryRequestWithResultLs.isEmpty) StepResult.Empty.copy(failCount = failCount) - else { - val isLastStep = stepIdx == q.steps.size - 1 - val queryOption = q.queryOption - val step = q.steps(stepIdx) - - val currentStepResults = queryRequests.view.zip(queryRequestWithResultLs) - val shouldBuildInnerResults = !isLastStep || buildLastStepInnerResult - val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges) - - if (shouldBuildInnerResults) { - val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) => - edgeWithScore - } - - /* process step group by */ - val results = StepResult.filterOutStepGroupBy(_results, step.groupBy) - StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount) - - } else { - val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) => - val edge = edgeWithScore.edge - val score = edgeWithScore.score - val label = edgeWithScore.label - - /* Select */ - val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns) - -// val newEdge = edge.copy(propsWithTs = mergedPropsWithTs) - val newEdge = edge.copyEdgeWithState(mergedPropsWithTs) - - val newEdgeWithScore = edgeWithScore.copy(edge = newEdge) - /* OrderBy */ - val orderByValues = - if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None) - else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys)) - - /* StepGroupBy */ - val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys) - - /* GroupBy */ - val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys) - - /* FilterOut */ - val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields) - - newEdgeWithScore.copy(orderByValues = orderByValues, - stepGroupByValues = stepGroupByValues, - groupByValues = groupByValues, - filterOutValues = filterOutValues) - } - - /* process step group by */ - val results = StepResult.filterOutStepGroupBy(_results, step.groupBy) - - /* process ordered list */ - val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil - - /* process grouped list */ - val grouped = - if (queryOption.groupBy.keys.isEmpty) Nil - else { - val agg = new scala.collection.mutable.HashMap[StepResult.GroupByKey, (Double, StepResult.Values)]() - results.groupBy { edgeWithScore => - // edgeWithScore.groupByValues.map(_.map(_.toString)) - edgeWithScore.groupByValues - }.foreach { case (k, ls) => - val (scoreSum, merged) = StepResult.mergeOrdered(ls, Nil, queryOption) - - val newScoreSum = scoreSum - - /* - * watch out here. by calling toString on Any, we lose type information which will be used - * later for toJson. - */ - if (merged.nonEmpty) { - val newKey = merged.head.groupByValues - agg += ((newKey, (newScoreSum, merged))) - } - } - agg.toSeq.sortBy(_._2._1 * -1) - } - - StepResult(edgeWithScores = ordered, grouped = grouped, degreeEdges = degrees, cursors = cursors, failCount = failCount) - } - } - } - } - - private def toEdgeWithScores(queryRequest: QueryRequest, - stepResult: StepResult, - parentEdges: Map[VertexId, Seq[EdgeWithScore]]): Seq[EdgeWithScore] = { - val queryOption = queryRequest.query.queryOption - val queryParam = queryRequest.queryParam - val prevScore = queryRequest.prevStepScore - val labelWeight = queryRequest.labelWeight - val edgeWithScores = stepResult.edgeWithScores - - val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent - val parents = if (shouldBuildParents) { - parentEdges.getOrElse(queryRequest.vertex.id, Nil).map { edgeWithScore => - val edge = edgeWithScore.edge - val score = edgeWithScore.score - val label = edgeWithScore.label - - /* Select */ - val mergedPropsWithTs = - if (queryOption.selectColumns.isEmpty) { - edge.propertyValuesInner() - } else { - val initial = Map(LabelMeta.timestamp -> edge.propertyValueInner(LabelMeta.timestamp)) - edge.propertyValues(queryOption.selectColumns) ++ initial - } - - val newEdge = edge.copyEdgeWithState(mergedPropsWithTs) - edgeWithScore.copy(edge = newEdge) - } - } else Nil - - // skip - if (queryOption.ignorePrevStepCache) stepResult.edgeWithScores - else { - val degreeScore = 0.0 - - val sampled = - if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample) - else edgeWithScores - - val withScores = for { - edgeWithScore <- sampled - } yield { - val edge = edgeWithScore.edge - val edgeScore = edgeWithScore.score - val score = queryParam.scorePropagateOp match { - case "plus" => edgeScore + prevScore - case "divide" => - if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0 - else edgeScore / (prevScore + queryParam.scorePropagateShrinkage) - case _ => edgeScore * prevScore - } - - val tsVal = processTimeDecay(queryParam, edge) - val newScore = degreeScore + score - // val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge - val newEdge = edge.copyParentEdges(parents) - edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal) - } - - val normalized = - if (queryParam.shouldNormalize) normalize(withScores) - else withScores - - normalized - } - } - - private def buildResult[R](query: Query, - stepIdx: Int, - stepResultLs: Seq[(QueryRequest, StepResult)], - parentEdges: Map[VertexId, Seq[EdgeWithScore]]) - (createFunc: (EdgeWithScore, Seq[LabelMeta]) => R) - (implicit ev: WithScore[R]): ListBuffer[R] = { - import scala.collection._ - - val results = ListBuffer.empty[R] - val sequentialLs: ListBuffer[(HashKey, FilterHashKey, R, QueryParam)] = ListBuffer.empty - val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, R)]] = mutable.HashMap.empty - val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty - val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty - - var numOfDuplicates = 0 - val queryOption = query.queryOption - val step = query.steps(stepIdx) - val excludeLabelWithDirSet = step.queryParams.filter(_.exclude).map(l => l.labelWithDir).toSet - val includeLabelWithDirSet = step.queryParams.filter(_.include).map(l => l.labelWithDir).toSet - - stepResultLs.foreach { case (queryRequest, stepInnerResult) => - val queryParam = queryRequest.queryParam - val label = queryParam.label - val shouldBeExcluded = excludeLabelWithDirSet.contains(queryParam.labelWithDir) - val shouldBeIncluded = includeLabelWithDirSet.contains(queryParam.labelWithDir) - - val propsSelectColumns = (for { - column <- queryOption.propsSelectColumns - labelMeta <- label.metaPropsInvMap.get(column) - } yield labelMeta) - - for { - edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges) - } { - val edge = edgeWithScore.edge - val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false) - // params += (hashKey -> queryParam) // - - /* check if this edge should be exlcuded. */ - if (shouldBeExcluded) { - edgesToExclude.add(filterHashKey) - } else { - if (shouldBeIncluded) { - edgesToInclude.add(filterHashKey) - } - val newEdgeWithScore = createFunc(edgeWithScore, propsSelectColumns) - - sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam)) - duplicates.get(hashKey) match { - case None => - val newLs = ListBuffer.empty[(FilterHashKey, R)] - newLs += (filterHashKey -> newEdgeWithScore) - duplicates += (hashKey -> newLs) // - case Some(old) => - numOfDuplicates += 1 - old += (filterHashKey -> newEdgeWithScore) // - } - } - } - } - - - if (numOfDuplicates == 0) { - // no duplicates at all. - for { - (hashKey, filterHashKey, edgeWithScore, _) <- sequentialLs - if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) - } { - results += edgeWithScore - } - } else { - // need to resolve duplicates. - val seen = new mutable.HashSet[HashKey]() - for { - (hashKey, filterHashKey, edgeWithScore, queryParam) <- sequentialLs - if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) - if !seen.contains(hashKey) - } { - // val queryParam = params(hashKey) - processDuplicates(queryParam, duplicates(hashKey)).foreach { case (_, duplicate) => - if (ev.score(duplicate) >= queryParam.threshold) { - seen += hashKey - results += duplicate - } - } - } - } - results - } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/42b7702e/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala index d0a59b2..3074b4e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core.storage import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.S2Graph.{convertEdges, normalize, processTimeDecay, sample} +import org.apache.s2graph.core.PostProcess._ import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.LabelMeta import org.apache.s2graph.core.parsers.WhereParser
