http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala deleted file mode 100644 index 38477b4..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala +++ /dev/null @@ -1,1238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core - -import java.util -import java.util.concurrent.Executors - -import com.typesafe.config.{Config, ConfigFactory} -import org.apache.commons.configuration.Configuration -import org.apache.s2graph.core.GraphExceptions.{FetchAllStepFailException, FetchTimeoutException, LabelNotExistException} -import org.apache.s2graph.core.JSONParser._ -import org.apache.s2graph.core.mysqls._ -import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage -import org.apache.s2graph.core.storage.{SKeyValue, Storage} -import org.apache.s2graph.core.types._ -import org.apache.s2graph.core.utils.{DeferCache, Extensions, SafeUpdateCache, logger} -import org.apache.tinkerpop.gremlin.process.computer.GraphComputer -import org.apache.tinkerpop.gremlin.structure -import org.apache.tinkerpop.gremlin.structure.Graph.Variables -import org.apache.tinkerpop.gremlin.structure.{Graph => TpGraph, Transaction} -import play.api.libs.json.{JsObject, Json} -import scala.annotation.tailrec -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, ListBuffer} -import scala.concurrent._ -import scala.util.{Random, Try} - -object Graph { - - type HashKey = (Int, Int, Int, Int, Boolean) - type FilterHashKey = (Int, Int) - - - val DefaultScore = 1.0 - - private val DefaultConfigs: Map[String, AnyRef] = Map( - "hbase.zookeeper.quorum" -> "localhost", - "hbase.table.name" -> "s2graph", - "hbase.table.compression.algorithm" -> "gz", - "phase" -> "dev", - "db.default.driver" -> "org.h2.Driver", - "db.default.url" -> "jdbc:h2:file:./var/metastore;MODE=MYSQL", - "db.default.password" -> "graph", - "db.default.user" -> "graph", - "cache.max.size" -> java.lang.Integer.valueOf(10000), - "cache.ttl.seconds" -> java.lang.Integer.valueOf(60), - "hbase.client.retries.number" -> java.lang.Integer.valueOf(20), - "hbase.rpcs.buffered_flush_interval" -> java.lang.Short.valueOf(100.toShort), - "hbase.rpc.timeout" -> java.lang.Integer.valueOf(1000), - "max.retry.number" -> java.lang.Integer.valueOf(100), - "lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10), - "max.back.off" -> java.lang.Integer.valueOf(100), - "back.off.timeout" -> java.lang.Integer.valueOf(1000), - "hbase.fail.prob" -> java.lang.Double.valueOf(-0.1), - "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000), - "delete.all.fetch.count" -> java.lang.Integer.valueOf(200), - "future.cache.max.size" -> java.lang.Integer.valueOf(100000), - "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000), - "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000), - "future.cache.metric.interval" -> java.lang.Integer.valueOf(60000), - "query.future.cache.max.size" -> java.lang.Integer.valueOf(1000), - "query.future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000), - "query.future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000), - "query.future.cache.metric.interval" -> java.lang.Integer.valueOf(60000), - "s2graph.storage.backend" -> "hbase", - "query.hardlimit" -> java.lang.Integer.valueOf(100000), - "hbase.zookeeper.znode.parent" -> "/hbase", - "query.log.sample.rate" -> Double.box(0.05) - ) - - var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs) - - - - def initStorage(graph: Graph, config: Config)(ec: ExecutionContext): Storage[_, _] = { - val storageBackend = config.getString("s2graph.storage.backend") - logger.info(s"[InitStorage]: $storageBackend") - - storageBackend match { - case "hbase" => new AsynchbaseStorage(graph, config)(ec) - case _ => throw new RuntimeException("not supported storage.") - } - } - - def parseCacheConfig(config: Config, prefix: String): Config = { - import scala.collection.JavaConversions._ - - val kvs = new java.util.HashMap[String, AnyRef]() - for { - entry <- config.entrySet() - (k, v) = (entry.getKey, entry.getValue) if k.startsWith(prefix) - } yield { - val newKey = k.replace(prefix, "") - kvs.put(newKey, v.unwrapped()) - } - ConfigFactory.parseMap(kvs) - } - - /** Global helper functions */ - @tailrec - final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = { - if (range < sampleNumber || set.size == sampleNumber) set - else randomInt(sampleNumber, range, set + Random.nextInt(range)) - } - - def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = { - if (edges.size <= n) { - edges - } else { - val plainEdges = if (queryRequest.queryParam.offset == 0) { - edges.tail - } else edges - - val randoms = randomInt(n, plainEdges.size) - var samples = List.empty[EdgeWithScore] - var idx = 0 - plainEdges.foreach { e => - if (randoms.contains(idx)) samples = e :: samples - idx += 1 - } - samples - } - } - - def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = { - val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score } - edgeWithScores.map { edgeWithScore => - edgeWithScore.copy(score = edgeWithScore.score / sum) - } - } - - def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, Vertex), Boolean] = { - val vertices = for { - edgeWithScore <- edgeWithScoreLs - edge = edgeWithScore.edge - vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex - } yield (edge.labelWithDir, vertex) -> true - - vertices.toMap - } - - /** common methods for filter out, transform, aggregate queryResult */ - def convertEdges(queryParam: QueryParam, edge: Edge, nextStepOpt: Option[Step]): Seq[Edge] = { - for { - convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree - } yield convertedEdge - } - - def processTimeDecay(queryParam: QueryParam, edge: Edge) = { - /* process time decay */ - val tsVal = queryParam.timeDecay match { - case None => 1.0 - case Some(timeDecay) => - val tsVal = try { - val innerValWithTsOpt = edge.propertyValue(timeDecay.labelMeta.name) - innerValWithTsOpt.map { innerValWithTs => - val innerVal = innerValWithTs.innerVal - timeDecay.labelMeta.dataType match { - case InnerVal.LONG => innerVal.value match { - case n: BigDecimal => n.bigDecimal.longValue() - case _ => innerVal.toString().toLong - } - case _ => innerVal.toString().toLong - } - } getOrElse (edge.ts) - } catch { - case e: Exception => - logger.error(s"processTimeDecay error. ${edge.toLogString}", e) - edge.ts - } - val timeDiff = queryParam.timestamp - tsVal - timeDecay.decay(timeDiff) - } - - tsVal - } - - def processDuplicates[T](queryParam: QueryParam, - duplicates: Seq[(FilterHashKey, T)])(implicit ev: WithScore[T]): Seq[(FilterHashKey, T)] = { - - if (queryParam.label.consistencyLevel != "strong") { - //TODO: - queryParam.duplicatePolicy match { - case DuplicatePolicy.First => Seq(duplicates.head) - case DuplicatePolicy.Raw => duplicates - case DuplicatePolicy.CountSum => - val countSum = duplicates.size - val (headFilterHashKey, headEdgeWithScore) = duplicates.head - Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, countSum)) - case _ => - val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + ev.score(current._2) } - val (headFilterHashKey, headEdgeWithScore) = duplicates.head - Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, scoreSum)) - } - } else { - duplicates - } - } - - def toHashKey(queryParam: QueryParam, edge: Edge, isDegree: Boolean): (HashKey, FilterHashKey) = { - val src = edge.srcVertex.innerId.hashCode() - val tgt = edge.tgtVertex.innerId.hashCode() - val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree) - val filterHashKey = (src, tgt) - - (hashKey, filterHashKey) - } - - def filterEdges(q: Query, - stepIdx: Int, - queryRequests: Seq[QueryRequest], - queryResultLsFuture: Future[Seq[StepResult]], - queryParams: Seq[QueryParam], - alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = Map.empty, - buildLastStepInnerResult: Boolean = true, - parentEdges: Map[VertexId, Seq[EdgeWithScore]]) - (implicit ec: scala.concurrent.ExecutionContext): Future[StepResult] = { - - queryResultLsFuture.map { queryRequestWithResultLs => - val (cursors, failCount) = { - val _cursors = ArrayBuffer.empty[Array[Byte]] - var _failCount = 0 - - queryRequestWithResultLs.foreach { stepResult => - _cursors.append(stepResult.cursors: _*) - _failCount += stepResult.failCount - } - - _cursors -> _failCount - } - - - if (queryRequestWithResultLs.isEmpty) StepResult.Empty.copy(failCount = failCount) - else { - val isLastStep = stepIdx == q.steps.size - 1 - val queryOption = q.queryOption - val step = q.steps(stepIdx) - - val currentStepResults = queryRequests.view.zip(queryRequestWithResultLs) - val shouldBuildInnerResults = !isLastStep || buildLastStepInnerResult - val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges) - - if (shouldBuildInnerResults) { - val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) => - edgeWithScore - } - - /** process step group by */ - val results = StepResult.filterOutStepGroupBy(_results, step.groupBy) - StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount) - - } else { - val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) => - val edge = edgeWithScore.edge - val score = edgeWithScore.score - val label = edgeWithScore.label - - /** Select */ - val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns) - -// val newEdge = edge.copy(propsWithTs = mergedPropsWithTs) - val newEdge = edge.copyEdgeWithState(mergedPropsWithTs) - - val newEdgeWithScore = edgeWithScore.copy(edge = newEdge) - /** OrderBy */ - val orderByValues = - if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None) - else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys)) - - /** StepGroupBy */ - val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys) - - /** GroupBy */ - val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys) - - /** FilterOut */ - val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields) - - newEdgeWithScore.copy(orderByValues = orderByValues, - stepGroupByValues = stepGroupByValues, - groupByValues = groupByValues, - filterOutValues = filterOutValues) - } - - /** process step group by */ - val results = StepResult.filterOutStepGroupBy(_results, step.groupBy) - - /** process ordered list */ - val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil - - /** process grouped list */ - val grouped = - if (queryOption.groupBy.keys.isEmpty) Nil - else { - val agg = new scala.collection.mutable.HashMap[StepResult.GroupByKey, (Double, StepResult.Values)]() - results.groupBy { edgeWithScore => - // edgeWithScore.groupByValues.map(_.map(_.toString)) - edgeWithScore.groupByValues - }.foreach { case (k, ls) => - val (scoreSum, merged) = StepResult.mergeOrdered(ls, Nil, queryOption) - - val newScoreSum = scoreSum - - /** - * watch out here. by calling toString on Any, we lose type information which will be used - * later for toJson. - */ - if (merged.nonEmpty) { - val newKey = merged.head.groupByValues - agg += (newKey -> (newScoreSum, merged)) - } - } - agg.toSeq.sortBy(_._2._1 * -1) - } - - StepResult(edgeWithScores = ordered, grouped = grouped, degreeEdges = degrees, cursors = cursors, failCount = failCount) - } - } - } - } - - private def toEdgeWithScores(queryRequest: QueryRequest, - stepResult: StepResult, - parentEdges: Map[VertexId, Seq[EdgeWithScore]]): Seq[EdgeWithScore] = { - val queryOption = queryRequest.query.queryOption - val queryParam = queryRequest.queryParam - val prevScore = queryRequest.prevStepScore - val labelWeight = queryRequest.labelWeight - val edgeWithScores = stepResult.edgeWithScores - - val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent - val parents = if (shouldBuildParents) { - parentEdges.getOrElse(queryRequest.vertex.id, Nil).map { edgeWithScore => - val edge = edgeWithScore.edge - val score = edgeWithScore.score - val label = edgeWithScore.label - - /** Select */ - val mergedPropsWithTs = - if (queryOption.selectColumns.isEmpty) { - edge.propertyValuesInner() - } else { - val initial = Map(LabelMeta.timestamp -> edge.propertyValueInner(LabelMeta.timestamp)) - edge.propertyValues(queryOption.selectColumns) ++ initial - } - - val newEdge = edge.copyEdgeWithState(mergedPropsWithTs) - edgeWithScore.copy(edge = newEdge) - } - } else Nil - - // skip - if (queryOption.ignorePrevStepCache) stepResult.edgeWithScores - else { - val degreeScore = 0.0 - - val sampled = - if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample) - else edgeWithScores - - val withScores = for { - edgeWithScore <- sampled - } yield { - val edge = edgeWithScore.edge - val edgeScore = edgeWithScore.score - val score = queryParam.scorePropagateOp match { - case "plus" => edgeScore + prevScore - case "divide" => - if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0 - else edgeScore / (prevScore + queryParam.scorePropagateShrinkage) - case _ => edgeScore * prevScore - } - - val tsVal = processTimeDecay(queryParam, edge) - val newScore = degreeScore + score - // val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge - val newEdge = edge.copy(parentEdges = parents) - edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal) - } - - val normalized = - if (queryParam.shouldNormalize) normalize(withScores) - else withScores - - normalized - } - } - - private def buildResult[T](query: Query, - stepIdx: Int, - stepResultLs: Seq[(QueryRequest, StepResult)], - parentEdges: Map[VertexId, Seq[EdgeWithScore]]) - (createFunc: (EdgeWithScore, Seq[LabelMeta]) => T) - (implicit ev: WithScore[T]): ListBuffer[T] = { - import scala.collection._ - - val results = ListBuffer.empty[T] - val sequentialLs: ListBuffer[(HashKey, FilterHashKey, T, QueryParam)] = ListBuffer.empty - val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, T)]] = mutable.HashMap.empty - val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty - val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty - - var numOfDuplicates = 0 - val queryOption = query.queryOption - val step = query.steps(stepIdx) - val excludeLabelWithDirSet = step.queryParams.filter(_.exclude).map(l => l.labelWithDir).toSet - val includeLabelWithDirSet = step.queryParams.filter(_.include).map(l => l.labelWithDir).toSet - - stepResultLs.foreach { case (queryRequest, stepInnerResult) => - val queryParam = queryRequest.queryParam - val label = queryParam.label - val shouldBeExcluded = excludeLabelWithDirSet.contains(queryParam.labelWithDir) - val shouldBeIncluded = includeLabelWithDirSet.contains(queryParam.labelWithDir) - - val propsSelectColumns = (for { - column <- queryOption.propsSelectColumns - labelMeta <- label.metaPropsInvMap.get(column) - } yield labelMeta) - - for { - edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges) - } { - val edge = edgeWithScore.edge - val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false) - // params += (hashKey -> queryParam) // - - /** check if this edge should be exlcuded. */ - if (shouldBeExcluded) { - edgesToExclude.add(filterHashKey) - } else { - if (shouldBeIncluded) { - edgesToInclude.add(filterHashKey) - } - val newEdgeWithScore = createFunc(edgeWithScore, propsSelectColumns) - - sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam)) - duplicates.get(hashKey) match { - case None => - val newLs = ListBuffer.empty[(FilterHashKey, T)] - newLs += (filterHashKey -> newEdgeWithScore) - duplicates += (hashKey -> newLs) // - case Some(old) => - numOfDuplicates += 1 - old += (filterHashKey -> newEdgeWithScore) // - } - } - } - } - - - if (numOfDuplicates == 0) { - // no duplicates at all. - for { - (hashKey, filterHashKey, edgeWithScore, _) <- sequentialLs - if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) - } { - results += edgeWithScore - } - } else { - // need to resolve duplicates. - val seen = new mutable.HashSet[HashKey]() - for { - (hashKey, filterHashKey, edgeWithScore, queryParam) <- sequentialLs - if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) - if !seen.contains(hashKey) - } { - // val queryParam = params(hashKey) - processDuplicates(queryParam, duplicates(hashKey)).foreach { case (_, duplicate) => - if (ev.score(duplicate) >= queryParam.threshold) { - seen += hashKey - results += duplicate - } - } - } - } - results - } - -} - -class Graph(_config: Config)(implicit val ec: ExecutionContext) extends TpGraph { - - import Graph._ - - val config = _config.withFallback(Graph.DefaultConfig) - - Model.apply(config) - Model.loadCache() - - val MaxRetryNum = config.getInt("max.retry.number") - val MaxBackOff = config.getInt("max.back.off") - val BackoffTimeout = config.getInt("back.off.timeout") - val DeleteAllFetchCount = config.getInt("delete.all.fetch.count") - val DeleteAllFetchSize = config.getInt("delete.all.fetch.size") - val FailProb = config.getDouble("hbase.fail.prob") - val LockExpireDuration = config.getInt("lock.expire.time") - val MaxSize = config.getInt("future.cache.max.size") - val ExpireAfterWrite = config.getInt("future.cache.expire.after.write") - val ExpireAfterAccess = config.getInt("future.cache.expire.after.access") - - val scheduledEx = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()) - - private def confWithFallback(conf: Config): Config = { - conf.withFallback(config) - } - - /** - * TODO: we need to some way to handle malformed configuration for storage. - */ - val storagePool: scala.collection.mutable.Map[String, Storage[_, _]] = { - val labels = Label.findAll() - val services = Service.findAll() - - val labelConfigs = labels.flatMap(_.toStorageConfig) - val serviceConfigs = services.flatMap(_.toStorageConfig) - - val configs = (labelConfigs ++ serviceConfigs).map { conf => - confWithFallback(conf) - }.toSet - - val pools = new java.util.HashMap[Config, Storage[_, _]]() - configs.foreach { config => - pools.put(config, Graph.initStorage(this, config)(ec)) - } - - val m = new java.util.concurrent.ConcurrentHashMap[String, Storage[_, _]]() - - labels.foreach { label => - if (label.storageConfigOpt.isDefined) { - m += (s"label:${label.label}" -> pools(label.storageConfigOpt.get)) - } - } - - services.foreach { service => - if (service.storageConfigOpt.isDefined) { - m += (s"service:${service.serviceName}" -> pools(service.storageConfigOpt.get)) - } - } - - m - } - - val defaultStorage: Storage[_, _] = Graph.initStorage(this, config)(ec) - - /** QueryLevel FutureCache */ - val queryFutureCache = new DeferCache[StepResult, Promise, Future](parseCacheConfig(config, "query."), empty = StepResult.Empty) - - for { - entry <- config.entrySet() if Graph.DefaultConfigs.contains(entry.getKey) - (k, v) = (entry.getKey, entry.getValue) - } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}") - - def getStorage(service: Service): Storage[_, _] = { - storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage) - } - - def getStorage(label: Label): Storage[_, _] = { - storagePool.getOrElse(s"label:${label.label}", defaultStorage) - } - - def flushStorage(): Unit = { - storagePool.foreach { case (_, storage) => - - /** flush is blocking */ - storage.flush() - } - } - - def fallback = Future.successful(StepResult.Empty) - - def checkEdges(edges: Seq[Edge]): Future[StepResult] = { - val futures = for { - edge <- edges - } yield { - fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) => - edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, queryParam.label)) - } - } - - Future.sequence(futures).map { edgeWithScoreLs => - val edgeWithScores = edgeWithScoreLs.flatten - StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil) - } - } - - // def checkEdges(edges: Seq[Edge]): Future[StepResult] = storage.checkEdges(edges) - - def getEdges(q: Query): Future[StepResult] = { - Try { - if (q.steps.isEmpty) { - // TODO: this should be get vertex query. - fallback - } else { - val filterOutFuture = q.queryOption.filterOutQuery match { - case None => fallback - case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery) - } - for { - stepResult <- getEdgesStepInner(q) - filterOutInnerResult <- filterOutFuture - } yield { - if (filterOutInnerResult.isEmpty) stepResult - else StepResult.filterOut(this, q.queryOption, stepResult, filterOutInnerResult) - } - } - } recover { - case e: Exception => - logger.error(s"getEdgesAsync: $e", e) - fallback - } get - } - - def getEdgesStepInner(q: Query, buildLastStepInnerResult: Boolean = false): Future[StepResult] = { - Try { - if (q.steps.isEmpty) fallback - else { - - val queryOption = q.queryOption - def fetch: Future[StepResult] = { - val startStepInnerResult = QueryResult.fromVertices(this, q) - q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) => - for { - prevStepInnerResult <- prevStepInnerResultFuture - currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult, buildLastStepInnerResult) - } yield { - currentStepInnerResult.copy( - accumulatedCursors = prevStepInnerResult.accumulatedCursors :+ currentStepInnerResult.cursors, - failCount = currentStepInnerResult.failCount + prevStepInnerResult.failCount - ) - } - } - } - - fetch - } - } recover { - case e: Exception => - logger.error(s"getEdgesAsync: $e", e) - fallback - } get - } - - def fetchStep(orgQuery: Query, - stepIdx: Int, - stepInnerResult: StepResult, - buildLastStepInnerResult: Boolean = false): Future[StepResult] = { - if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty) - else { - val edgeWithScoreLs = stepInnerResult.edgeWithScores - - val q = orgQuery - val queryOption = orgQuery.queryOption - val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None - val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold) - val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1) - val step = q.steps(stepIdx) - - val alreadyVisited = - if (stepIdx == 0) Map.empty[(LabelWithDirection, Vertex), Boolean] - else alreadyVisitedVertices(stepInnerResult.edgeWithScores) - - val initial = (Map.empty[Vertex, Double], Map.empty[Vertex, ArrayBuffer[EdgeWithScore]]) - val (sums, grouped) = edgeWithScoreLs.foldLeft(initial) { case ((sum, group), edgeWithScore) => - val key = edgeWithScore.edge.tgtVertex - val newScore = sum.getOrElse(key, 0.0) + edgeWithScore.score - val buffer = group.getOrElse(key, ArrayBuffer.empty[EdgeWithScore]) - buffer += edgeWithScore - (sum + (key -> newScore), group + (key -> buffer)) - } - val groupedByFiltered = sums.filter(t => t._2 >= prevStepThreshold) - val prevStepTgtVertexIdEdges = grouped.map(t => t._1.id -> t._2) - - val nextStepSrcVertices = if (prevStepLimit >= 0) { - groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit) - } else { - groupedByFiltered.toSeq - } - - val queryRequests = for { - (vertex, prevStepScore) <- nextStepSrcVertices - queryParam <- step.queryParams - } yield { - val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0) - val newPrevStepScore = if (queryOption.shouldPropagateScore) prevStepScore else 1.0 - QueryRequest(q, stepIdx, vertex, queryParam, newPrevStepScore, labelWeight) - } - - val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges) - - filterEdges(orgQuery, stepIdx, queryRequests, - fetchedLs, orgQuery.steps(stepIdx).queryParams, alreadyVisited, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec) - } - } - - - /** - * responsible to fire parallel fetch call into storage and create future that will return merged result. - * - * @param queryRequests - * @param prevStepEdges - * @return - */ - def fetches(queryRequests: Seq[QueryRequest], - prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = { - - val reqWithIdxs = queryRequests.zipWithIndex - val requestsPerLabel = reqWithIdxs.groupBy(t => t._1.queryParam.label) - val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) => - for { - prev <- prevFuture - cur <- getStorage(label).fetches(reqWithIdxs.map(_._1), prevStepEdges) - } yield { - prev ++ reqWithIdxs.map(_._2).zip(cur).toMap - } - } - aggFuture.map { agg => agg.toSeq.sortBy(_._1).map(_._2) } - } - - - def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = { - Try { - if (mq.queries.isEmpty) fallback - else { - val filterOutFuture = mq.queryOption.filterOutQuery match { - case None => fallback - case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery) - } - - val multiQueryFutures = Future.sequence(mq.queries.map { query => getEdges(query) }) - for { - multiQueryResults <- multiQueryFutures - filterOutInnerResult <- filterOutFuture - } yield { - StepResult.merges(mq.queryOption, multiQueryResults, mq.weights, filterOutInnerResult) - } - } - } recover { - case e: Exception => - logger.error(s"getEdgesAsync: $e", e) - fallback - } get - } - - - def fetchSnapshotEdge(edge: Edge): Future[(QueryParam, Option[Edge], Option[SKeyValue])] = { - /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache - * so use empty cacheKey. - * */ - val queryParam = QueryParam(labelName = edge.innerLabel.label, - direction = GraphUtil.fromDirection(edge.labelWithDir.dir), - tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal), - cacheTTLInMillis = -1) - val q = Query.toQuery(Seq(edge.srcVertex), queryParam) - val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam) - - val storage = getStorage(edge.innerLabel) - storage.fetchSnapshotEdgeKeyValues(queryRequest).map { kvs => - val (edgeOpt, kvOpt) = - if (kvs.isEmpty) (None, None) - else { - val snapshotEdgeOpt = storage.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil) - val _kvOpt = kvs.headOption - (snapshotEdgeOpt, _kvOpt) - } - (queryParam, edgeOpt, kvOpt) - } recoverWith { case ex: Throwable => - logger.error(s"fetchQueryParam failed. fallback return.", ex) - throw FetchTimeoutException(s"${edge.toLogString}") - } - } - - def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = { - val verticesWithIdx = vertices.zipWithIndex - val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) => - getStorage(service).getVertices(vertexGroup.map(_._1)).map(_.zip(vertexGroup.map(_._2))) - } - - Future.sequence(futures).map { ls => - ls.flatten.toSeq.sortBy(_._2).map(_._1) - } - } - - /** mutate */ - def deleteAllAdjacentEdges(srcVertices: Seq[Vertex], - labels: Seq[Label], - dir: Int, - ts: Long): Future[Boolean] = { - - val requestTs = ts - val vertices = srcVertices - /** create query per label */ - val queries = for { - label <- labels - } yield { - val queryParam = QueryParam(labelName = label.label, direction = GraphUtil.fromDirection(dir), - offset = 0, limit = DeleteAllFetchSize, duplicatePolicy = DuplicatePolicy.Raw) - val step = Step(List(queryParam)) - Query(vertices, Vector(step)) - } - - // Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) { - val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) { - fetchAndDeleteAll(queries, requestTs) - } { case (allDeleted, deleteSuccess) => - allDeleted && deleteSuccess - }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess } - - retryFuture onFailure { - case ex => - logger.error(s"[Error]: deleteAllAdjacentEdges failed.") - } - - retryFuture - } - - def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = { - val future = for { - stepInnerResultLs <- Future.sequence(queries.map(getEdgesStepInner(_, true))) - (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs) - } yield { - // logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}") - (allDeleted, ret) - } - - Extensions.retryOnFailure(MaxRetryNum) { - future - } { - logger.error(s"fetch and deleteAll failed.") - (true, false) - } - - } - - def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult], - requestTs: Long): Future[(Boolean, Boolean)] = { - stepInnerResultLs.foreach { stepInnerResult => - if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.") - } - val futures = for { - stepInnerResult <- stepInnerResultLs - deleteStepInnerResult = buildEdgesToDelete(stepInnerResult, requestTs) - if deleteStepInnerResult.edgeWithScores.nonEmpty - } yield { - val head = deleteStepInnerResult.edgeWithScores.head - val label = head.edge.innerLabel - val ret = label.schemaVersion match { - case HBaseType.VERSION3 | HBaseType.VERSION4 => - if (label.consistencyLevel == "strong") { - /** - * read: snapshotEdge on queryResult = O(N) - * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge)) - */ - mutateEdges(deleteStepInnerResult.edgeWithScores.map(_.edge), withWait = true).map(_.forall(identity)) - } else { - getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum) - } - case _ => - - /** - * read: x - * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices) - */ - getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum) - } - ret - } - - if (futures.isEmpty) { - // all deleted. - Future.successful(true -> true) - } else { - Future.sequence(futures).map { rets => false -> rets.forall(identity) } - } - } - - def buildEdgesToDelete(stepInnerResult: StepResult, requestTs: Long): StepResult = { - val filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore => - (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree - } - if (filtered.isEmpty) StepResult.Empty - else { - val head = filtered.head - val label = head.edge.innerLabel - val edgeWithScoreLs = filtered.map { edgeWithScore => - val edge = edgeWithScore.edge - val copiedEdge = label.consistencyLevel match { - case "strong" => - edge.copyEdge(op = GraphUtil.operations("delete"), - version = requestTs, propsWithTs = Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs) - case _ => - edge.copyEdge(propsWithTs = Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs) - } -// val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match { -// case "strong" => -// val edge = edgeWithScore.edge -// edge.property(LabelMeta.timestamp.name, requestTs) -// val _newPropsWithTs = edge.updatePropsWithTs() -// -// (GraphUtil.operations("delete"), requestTs, _newPropsWithTs) -// case _ => -// val oldEdge = edgeWithScore.edge -// (oldEdge.op, oldEdge.version, oldEdge.updatePropsWithTs()) -// } -// -// val copiedEdge = -// edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs) - - val edgeToDelete = edgeWithScore.copy(edge = copiedEdge) - // logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}") - edgeToDelete - } - //Degree edge? - StepResult(edgeWithScores = edgeWithScoreLs, grouped = Nil, degreeEdges = Nil, false) - } - } - - // def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] = - // storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts) - - def mutateElements(elements: Seq[GraphElement], - withWait: Boolean = false): Future[Seq[Boolean]] = { - - val edgeBuffer = ArrayBuffer[(Edge, Int)]() - val vertexBuffer = ArrayBuffer[(Vertex, Int)]() - - elements.zipWithIndex.foreach { - case (e: Edge, idx: Int) => edgeBuffer.append((e, idx)) - case (v: Vertex, idx: Int) => vertexBuffer.append((v, idx)) - case any@_ => logger.error(s"Unknown type: ${any}") - } - - val edgeFutureWithIdx = mutateEdges(edgeBuffer.map(_._1), withWait).map { result => - edgeBuffer.map(_._2).zip(result) - } - - val vertexFutureWithIdx = mutateVertices(vertexBuffer.map(_._1), withWait).map { result => - vertexBuffer.map(_._2).zip(result) - } - - val graphFuture = for { - edgesMutated <- edgeFutureWithIdx - verticesMutated <- vertexFutureWithIdx - } yield (edgesMutated ++ verticesMutated).sortBy(_._1).map(_._2) - - graphFuture - - } - - // def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait) - - def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = { - val edgeWithIdxs = edges.zipWithIndex - - val (strongEdges, weakEdges) = - edgeWithIdxs.partition { case (edge, idx) => - val e = edge - e.innerLabel.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk") - } - - val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) => - val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, edgeGroup) => - val storage = getStorage(label) - val edges = edgeGroup.map(_._1) - val idxs = edgeGroup.map(_._2) - - /** multiple edges with weak consistency level will be processed as batch */ - val mutations = edges.flatMap { edge => - val (_, edgeUpdate) = - if (edge.op == GraphUtil.operations("delete")) Edge.buildDeleteBulk(None, edge) - else Edge.buildOperation(None, Seq(edge)) - - storage.buildVertexPutsAsync(edge) ++ storage.indexedEdgeMutations(edgeUpdate) ++ storage.snapshotEdgeMutations(edgeUpdate) ++ storage.increments(edgeUpdate) - } - - storage.writeToStorage(zkQuorum, mutations, withWait).map { ret => - idxs.map(idx => idx -> ret) - } - } - Future.sequence(futures) - } - val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.op == GraphUtil.operations("deleteAll") } - - val deleteAllFutures = strongDeleteAll.map { case (edge, idx) => - deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.innerLabel), edge.labelWithDir.dir, edge.ts).map(idx -> _) - } - - val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) => - val edges = edgeGroup.map(_._1) - val idxs = edgeGroup.map(_._2) - val storage = getStorage(label) - storage.mutateStrongEdges(edges, withWait = true).map { rets => - idxs.zip(rets) - } - } - - for { - weak <- Future.sequence(weakEdgesFutures) - deleteAll <- Future.sequence(deleteAllFutures) - strong <- Future.sequence(strongEdgesFutures) - } yield { - (deleteAll ++ weak.flatten.flatten ++ strong.flatten).sortBy(_._1).map(_._2) - } - } - - def mutateVertices(vertices: Seq[Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = { - val verticesWithIdx = vertices.zipWithIndex - val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) => - getStorage(service).mutateVertices(vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2))) - } - Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) } - } - - def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = { - val edgesWithIdx = edges.zipWithIndex - val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) => - getStorage(label).incrementCounts(edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2))) - } - Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) } - } - - def updateDegree(edge: Edge, degreeVal: Long = 0): Future[Boolean] = { - val label = edge.innerLabel - - val storage = getStorage(label) - val kvs = storage.buildDegreePuts(edge, degreeVal) - - storage.writeToStorage(edge.innerLabel.service.cluster, kvs, withWait = true) - } - - def shutdown(): Unit = { - flushStorage() - Model.shutdown() - } - - def addEdge(srcId: Any, - tgtId: Any, - labelName: String, - direction: String = "out", - props: Map[String, Any] = Map.empty, - ts: Long = System.currentTimeMillis(), - operation: String = "insert", - withWait: Boolean = true): Future[Boolean] = { - - val innerEdges = Seq(toEdge(srcId, tgtId, labelName, direction, props.toMap, ts, operation)) - mutateEdges(innerEdges, withWait).map(_.headOption.getOrElse(false)) - } - - def addVertex(serviceName: String, - columnName: String, - id: Any, - props: Map[String, Any] = Map.empty, - ts: Long = System.currentTimeMillis(), - operation: String = "insert", - withWait: Boolean = true): Future[Boolean] = { - val innerVertices = Seq(toVertex(serviceName, columnName, id, props.toMap, ts, operation)) - mutateVertices(innerVertices, withWait).map(_.headOption.getOrElse(false)) - } - - def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try { - val parts = GraphUtil.split(s) - val logType = parts(2) - val element = if (logType == "edge" | logType == "e") { - /** current only edge is considered to be bulk loaded */ - labelMapping.get(parts(5)) match { - case None => - case Some(toReplace) => - parts(5) = toReplace - } - toEdge(parts) - } else if (logType == "vertex" | logType == "v") { - toVertex(parts) - } else { - throw new GraphExceptions.JsonParseException("log type is not exist in log.") - } - - element - } recover { - case e: Exception => - logger.error(s"[toElement]: $e", e) - None - } get - - - def toVertex(s: String): Option[Vertex] = { - toVertex(GraphUtil.split(s)) - } - - def toEdge(s: String): Option[Edge] = { - toEdge(GraphUtil.split(s)) - } - - def toEdge(parts: Array[String]): Option[Edge] = Try { - val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) - val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] - val tempDirection = if (parts.length >= 8) parts(7) else "out" - val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection - val edge = toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation) - Option(edge) - } recover { - case e: Exception => - logger.error(s"[toEdge]: $e", e) - throw e - } get - - def toVertex(parts: Array[String]): Option[Vertex] = Try { - val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) - val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] - val vertex = toVertex(serviceName, colName, srcId, props, ts.toLong, operation) - Option(vertex) - } recover { - case e: Throwable => - logger.error(s"[toVertex]: $e", e) - throw e - } get - - - def newSnapshotEdge(srcVertex: Vertex, - tgtVertex: Vertex, - label: Label, - dir: Int, - op: Byte, - version: Long, - propsWithTs: Edge.State, - pendingEdgeOpt: Option[Edge], - statusCode: Byte = 0, - lockTs: Option[Long], - tsInnerValOpt: Option[InnerValLike] = None): SnapshotEdge = { - val snapshotEdge = new SnapshotEdge(this, srcVertex, tgtVertex, label, dir, op, version, Edge.EmptyProps, - pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) - Edge.fillPropsWithTs(snapshotEdge, propsWithTs) - snapshotEdge - } - - def newEdge(srcVertex: Vertex, - tgtVertex: Vertex, - innerLabel: Label, - dir: Int, - op: Byte = GraphUtil.defaultOpByte, - version: Long = System.currentTimeMillis(), - propsWithTs: Edge.State, - parentEdges: Seq[EdgeWithScore] = Nil, - originalEdgeOpt: Option[Edge] = None, - pendingEdgeOpt: Option[Edge] = None, - statusCode: Byte = 0, - lockTs: Option[Long] = None, - tsInnerValOpt: Option[InnerValLike] = None): Edge = { - val edge = new Edge(this, srcVertex, tgtVertex, innerLabel, dir, op, version, Edge.EmptyProps, - parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) - Edge.fillPropsWithTs(edge, propsWithTs) - edge - } - def toEdge(srcId: Any, - tgtId: Any, - labelName: String, - direction: String, - props: Map[String, Any] = Map.empty, - ts: Long = System.currentTimeMillis(), - operation: String = "insert"): Edge = { - val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) - - val srcVertexId = toInnerVal(srcId.toString, label.srcColumn.columnType, label.schemaVersion) - val tgtVertexId = toInnerVal(tgtId.toString, label.tgtColumn.columnType, label.schemaVersion) - - val srcColId = label.srcColumn.id.get - val tgtColId = label.tgtColumn.id.get - - val srcVertex = newVertex(SourceVertexId(label.srcColumn, srcVertexId), System.currentTimeMillis()) - val tgtVertex = newVertex(TargetVertexId(label.tgtColumn, tgtVertexId), System.currentTimeMillis()) - val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported.")) - - val labelWithDir = LabelWithDirection(label.id.get, dir) - val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts) - val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts) - val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) - - new Edge(this, srcVertex, tgtVertex, label, dir, op = op, version = ts).copyEdgeWithState(propsWithTs) - } - - def newVertex(id: VertexId, - ts: Long = System.currentTimeMillis(), - props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike], - op: Byte = 0, - belongLabelIds: Seq[Int] = Seq.empty): Vertex = { - new Vertex(this, id, ts, props, op, belongLabelIds) - } - def toVertex(serviceName: String, - columnName: String, - id: Any, - props: Map[String, Any] = Map.empty, - ts: Long = System.currentTimeMillis(), - operation: String = "insert"): Vertex = { - - val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found.")) - val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found.")) - val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) - - val srcVertexId = VertexId(column, toInnerVal(id.toString, column.columnType, column.schemaVersion)) - val propsInner = column.propsToInnerVals(props) ++ - Map(ColumnMeta.timeStampSeq.toInt -> InnerVal.withLong(ts, column.schemaVersion)) - - new Vertex(this, srcVertexId, ts, propsInner, op) - } - - override def vertices(objects: AnyRef*): util.Iterator[structure.Vertex] = ??? - - override def tx(): Transaction = ??? - - override def edges(objects: AnyRef*): util.Iterator[structure.Edge] = ??? - - override def variables(): Variables = ??? - - override def configuration(): Configuration = ??? - - override def addVertex(objects: AnyRef*): structure.Vertex = ??? - - override def close(): Unit = ??? - - override def compute[C <: GraphComputer](aClass: Class[C]): C = ??? - - override def compute(): GraphComputer = ??? -}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index 60900be..064a3d1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -256,7 +256,7 @@ object Management { } } -class Management(graph: Graph) { +class Management(graph: S2Graph) { import Management._ def createStorageTable(zkAddr: String, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala index 083159f..b22eb65 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala @@ -53,7 +53,7 @@ object PostProcess { case _ => Json.obj("message" -> ex.getMessage) } - def s2EdgeParent(graph: Graph, + def s2EdgeParent(graph: S2Graph, queryOption: QueryOption, parentEdges: Seq[EdgeWithScore]): JsValue = { if (parentEdges.isEmpty) JsNull @@ -141,7 +141,7 @@ object PostProcess { } } - def s2VertexToJson(s2Vertex: Vertex): Option[JsValue] = { + def s2VertexToJson(s2Vertex: S2Vertex): Option[JsValue] = { val props = for { (k, v) <- s2Vertex.properties jsVal <- anyValToJsValue(v) @@ -160,7 +160,7 @@ object PostProcess { } } - def verticesToJson(s2Vertices: Seq[Vertex]): JsValue = + def verticesToJson(s2Vertices: Seq[S2Vertex]): JsValue = Json.toJson(s2Vertices.flatMap(s2VertexToJson(_))) def withOptionalFields(queryOption: QueryOption, @@ -189,7 +189,7 @@ object PostProcess { case _ => js } - def toJson(orgQuery: Option[JsValue])(graph: Graph, + def toJson(orgQuery: Option[JsValue])(graph: S2Graph, queryOption: QueryOption, stepResult: StepResult): JsValue = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala index 170fd0b..eb36258 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala @@ -27,7 +27,7 @@ import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} import org.apache.s2graph.core.parsers.{Where, WhereParser} import org.apache.s2graph.core.rest.TemplateHelper import org.apache.s2graph.core.storage.StorageSerializable._ -import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs, LabelWithDirection} +import org.apache.s2graph.core.types._ import org.hbase.async.ColumnRangeFilter import play.api.libs.json.{JsString, JsNull, JsValue, Json} @@ -39,7 +39,7 @@ object Query { def apply(query: Query): Query = { Query(query.vertices, query.steps, query.queryOption, query.jsonQuery) } - def toQuery(srcVertices: Seq[Vertex], queryParam: QueryParam) = Query(srcVertices, Vector(Step(List(queryParam)))) + def toQuery(srcVertices: Seq[S2Vertex], queryParams: Seq[QueryParam]) = Query(srcVertices, Vector(Step(queryParams))) } @@ -96,7 +96,7 @@ case class QueryOption(removeCycle: Boolean = false, } -case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex], +case class Query(vertices: Seq[S2Vertex] = Seq.empty[S2Vertex], steps: IndexedSeq[Step] = Vector.empty[Step], queryOption: QueryOption = QueryOption(), jsonQuery: JsValue = JsNull) { @@ -162,7 +162,7 @@ case class EdgeTransformer(jsValue: JsValue) { } } - def toInnerValOpt(queryParam: QueryParam, edge: Edge, fieldName: String): Option[InnerValLike] = { + def toInnerValOpt(queryParam: QueryParam, edge: S2Edge, fieldName: String): Option[InnerValLike] = { fieldName match { case LabelMeta.to.name => Option(edge.tgtVertex.innerId) case LabelMeta.from.name => Option(edge.srcVertex.innerId) @@ -170,7 +170,7 @@ case class EdgeTransformer(jsValue: JsValue) { } } - def transform(queryParam: QueryParam, edge: Edge, nextStepOpt: Option[Step]): Seq[Edge] = { + def transform(queryParam: QueryParam, edge: S2Edge, nextStepOpt: Option[Step]): Seq[S2Edge] = { if (isDefault) Seq(edge) else { val edges = for { @@ -218,7 +218,7 @@ case class Step(queryParams: Seq[QueryParam], } } -case class VertexParam(vertices: Seq[Vertex]) { +case class VertexParam(vertices: Seq[S2Vertex]) { var filters: Option[Map[Byte, InnerValLike]] = None def has(what: Option[Map[Byte, InnerValLike]]): VertexParam = { @@ -306,11 +306,10 @@ case class QueryParam(labelName: String, else label.indexNameMap.getOrElse(indexName, throw new RuntimeException(s"$indexName indexName is not found.")).seq lazy val tgtVertexInnerIdOpt = tgtVertexIdOpt.map { id => - val tmp = label.tgtColumnWithDir(dir) - toInnerVal(id, tmp.columnType, tmp.schemaVersion) + CanInnerValLike.anyToInnerValLike.toInnerVal(id)(label.tgtColumnWithDir(dir).schemaVersion) } - def buildInterval(edgeOpt: Option[Edge]) = intervalOpt match { + def buildInterval(edgeOpt: Option[S2Edge]) = intervalOpt match { case None => Array.empty[Byte] -> Array.empty[Byte] case Some(interval) => val (froms, tos) = interval @@ -358,7 +357,7 @@ case class QueryParam(labelName: String, Bytes.add(bytes, optionalCacheKey) } - private def convertToInner(kvs: Seq[(String, JsValue)], edgeOpt: Option[Edge]): Seq[(LabelMeta, InnerValLike)] = { + private def convertToInner(kvs: Seq[(String, JsValue)], edgeOpt: Option[S2Edge]): Seq[(LabelMeta, InnerValLike)] = { kvs.map { case (propKey, propValJs) => propValJs match { case JsString(in) if edgeOpt.isDefined && in.contains("_parent.") => @@ -376,7 +375,7 @@ case class QueryParam(labelName: String, val propVal = if (InnerVal.isNumericType(labelMeta.dataType)) { - InnerVal.withLong(edge.property(labelMeta.name).value.asInstanceOf[BigDecimal].longValue() + padding, label.schemaVersion) + InnerVal.withLong(edge.property(labelMeta.name).value.toString.toLong + padding, label.schemaVersion) } else { edge.property(labelMeta.name).asInstanceOf[S2Property[_]].innerVal } @@ -391,7 +390,7 @@ case class QueryParam(labelName: String, } } - def paddingInterval(len: Byte, froms: Seq[(String, JsValue)], tos: Seq[(String, JsValue)], edgeOpt: Option[Edge] = None) = { + def paddingInterval(len: Byte, froms: Seq[(String, JsValue)], tos: Seq[(String, JsValue)], edgeOpt: Option[S2Edge] = None) = { val fromInnerVal = convertToInner(froms, edgeOpt) val toInnerVal = convertToInner(tos, edgeOpt) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala index 3753d0f..bad8361 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala @@ -27,7 +27,7 @@ import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.collection.{Seq, mutable} object QueryResult { - def fromVertices(graph: Graph, + def fromVertices(graph: S2Graph, query: Query): StepResult = { if (query.steps.isEmpty || query.steps.head.queryParams.isEmpty) { StepResult.Empty @@ -41,7 +41,7 @@ object QueryResult { vertex <- query.vertices } yield { val edge = graph.newEdge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs) - val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore, queryParam.label) + val edgeWithScore = EdgeWithScore(edge, S2Graph.DefaultScore, queryParam.label) edgeWithScore } StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil, false) @@ -51,7 +51,7 @@ object QueryResult { case class QueryRequest(query: Query, stepIdx: Int, - vertex: Vertex, + vertex: S2Vertex, queryParam: QueryParam, prevStepScore: Double = 1.0, labelWeight: Double = 1.0) { @@ -73,7 +73,7 @@ object WithScore { } } -case class EdgeWithScore(edge: Edge, +case class EdgeWithScore(edge: S2Edge, score: Double, label: Label, orderByValues: (Any, Any, Any, Any) = StepResult.EmptyOrderByValues, @@ -283,7 +283,7 @@ object StepResult { } //TODO: Optimize this. - def filterOut(graph: Graph, + def filterOut(graph: S2Graph, queryOption: QueryOption, baseStepResult: StepResult, filterOutStepResult: StepResult): StepResult = {