http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala new file mode 100644 index 0000000..51a80f9 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -0,0 +1,1397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core + +import java.util +import java.util.concurrent.{Executors, TimeUnit} + +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.commons.configuration.Configuration +import org.apache.s2graph.core.GraphExceptions.{FetchTimeoutException, LabelNotExistException} +import org.apache.s2graph.core.JSONParser._ +import org.apache.s2graph.core.mysqls._ +import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage +import org.apache.s2graph.core.storage.{SKeyValue, Storage} +import org.apache.s2graph.core.types._ +import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger} +import org.apache.tinkerpop.gremlin.process.computer.GraphComputer +import org.apache.tinkerpop.gremlin.structure +import org.apache.tinkerpop.gremlin.structure.Graph.Variables +import org.apache.tinkerpop.gremlin.structure.util.ElementHelper +import org.apache.tinkerpop.gremlin.structure.{Edge, Graph, T, Transaction} +import play.api.libs.json.{JsObject, Json} + +import scala.annotation.tailrec +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, ListBuffer} +import scala.concurrent._ +import scala.concurrent.duration.Duration +import scala.util.{Random, Try} + + +object S2Graph { + + type HashKey = (Int, Int, Int, Int, Boolean) + type FilterHashKey = (Int, Int) + + + val DefaultScore = 1.0 + + private val DefaultConfigs: Map[String, AnyRef] = Map( + "hbase.zookeeper.quorum" -> "localhost", + "hbase.table.name" -> "s2graph", + "hbase.table.compression.algorithm" -> "gz", + "phase" -> "dev", + "db.default.driver" -> "org.h2.Driver", + "db.default.url" -> "jdbc:h2:file:./var/metastore;MODE=MYSQL", + "db.default.password" -> "graph", + "db.default.user" -> "graph", + "cache.max.size" -> java.lang.Integer.valueOf(10000), + "cache.ttl.seconds" -> java.lang.Integer.valueOf(60), + "hbase.client.retries.number" -> java.lang.Integer.valueOf(20), + "hbase.rpcs.buffered_flush_interval" -> java.lang.Short.valueOf(100.toShort), + "hbase.rpc.timeout" -> java.lang.Integer.valueOf(1000), + "max.retry.number" -> java.lang.Integer.valueOf(100), + "lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10), + "max.back.off" -> java.lang.Integer.valueOf(100), + "back.off.timeout" -> java.lang.Integer.valueOf(1000), + "hbase.fail.prob" -> java.lang.Double.valueOf(-0.1), + "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000), + "delete.all.fetch.count" -> java.lang.Integer.valueOf(200), + "future.cache.max.size" -> java.lang.Integer.valueOf(100000), + "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000), + "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000), + "future.cache.metric.interval" -> java.lang.Integer.valueOf(60000), + "query.future.cache.max.size" -> java.lang.Integer.valueOf(1000), + "query.future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000), + "query.future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000), + "query.future.cache.metric.interval" -> java.lang.Integer.valueOf(60000), + "s2graph.storage.backend" -> "hbase", + "query.hardlimit" -> java.lang.Integer.valueOf(100000), + "hbase.zookeeper.znode.parent" -> "/hbase", + "query.log.sample.rate" -> Double.box(0.05) + ) + + var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs) + + + + def initStorage(graph: S2Graph, config: Config)(ec: ExecutionContext): Storage[_, _] = { + val storageBackend = config.getString("s2graph.storage.backend") + logger.info(s"[InitStorage]: $storageBackend") + + storageBackend match { + case "hbase" => new AsynchbaseStorage(graph, config)(ec) + case _ => throw new RuntimeException("not supported storage.") + } + } + + def parseCacheConfig(config: Config, prefix: String): Config = { + import scala.collection.JavaConversions._ + + val kvs = new java.util.HashMap[String, AnyRef]() + for { + entry <- config.entrySet() + (k, v) = (entry.getKey, entry.getValue) if k.startsWith(prefix) + } yield { + val newKey = k.replace(prefix, "") + kvs.put(newKey, v.unwrapped()) + } + ConfigFactory.parseMap(kvs) + } + + /** Global helper functions */ + @tailrec + final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = { + if (range < sampleNumber || set.size == sampleNumber) set + else randomInt(sampleNumber, range, set + Random.nextInt(range)) + } + + def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = { + if (edges.size <= n) { + edges + } else { + val plainEdges = if (queryRequest.queryParam.offset == 0) { + edges.tail + } else edges + + val randoms = randomInt(n, plainEdges.size) + var samples = List.empty[EdgeWithScore] + var idx = 0 + plainEdges.foreach { e => + if (randoms.contains(idx)) samples = e :: samples + idx += 1 + } + samples + } + } + + def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = { + val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score } + edgeWithScores.map { edgeWithScore => + edgeWithScore.copy(score = edgeWithScore.score / sum) + } + } + + def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, S2Vertex), Boolean] = { + val vertices = for { + edgeWithScore <- edgeWithScoreLs + edge = edgeWithScore.edge + vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex + } yield (edge.labelWithDir, vertex) -> true + + vertices.toMap + } + + /** common methods for filter out, transform, aggregate queryResult */ + def convertEdges(queryParam: QueryParam, edge: S2Edge, nextStepOpt: Option[Step]): Seq[S2Edge] = { + for { + convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree + } yield convertedEdge + } + + def processTimeDecay(queryParam: QueryParam, edge: S2Edge) = { + /* process time decay */ + val tsVal = queryParam.timeDecay match { + case None => 1.0 + case Some(timeDecay) => + val tsVal = try { + val innerValWithTsOpt = edge.propertyValue(timeDecay.labelMeta.name) + innerValWithTsOpt.map { innerValWithTs => + val innerVal = innerValWithTs.innerVal + timeDecay.labelMeta.dataType match { + case InnerVal.LONG => innerVal.value match { + case n: BigDecimal => n.bigDecimal.longValue() + case _ => innerVal.toString().toLong + } + case _ => innerVal.toString().toLong + } + } getOrElse (edge.ts) + } catch { + case e: Exception => + logger.error(s"processTimeDecay error. ${edge.toLogString}", e) + edge.ts + } + val timeDiff = queryParam.timestamp - tsVal + timeDecay.decay(timeDiff) + } + + tsVal + } + + def processDuplicates[T](queryParam: QueryParam, + duplicates: Seq[(FilterHashKey, T)])(implicit ev: WithScore[T]): Seq[(FilterHashKey, T)] = { + + if (queryParam.label.consistencyLevel != "strong") { + //TODO: + queryParam.duplicatePolicy match { + case DuplicatePolicy.First => Seq(duplicates.head) + case DuplicatePolicy.Raw => duplicates + case DuplicatePolicy.CountSum => + val countSum = duplicates.size + val (headFilterHashKey, headEdgeWithScore) = duplicates.head + Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, countSum)) + case _ => + val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + ev.score(current._2) } + val (headFilterHashKey, headEdgeWithScore) = duplicates.head + Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, scoreSum)) + } + } else { + duplicates + } + } + + def toHashKey(queryParam: QueryParam, edge: S2Edge, isDegree: Boolean): (HashKey, FilterHashKey) = { + val src = edge.srcVertex.innerId.hashCode() + val tgt = edge.tgtVertex.innerId.hashCode() + val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree) + val filterHashKey = (src, tgt) + + (hashKey, filterHashKey) + } + + def filterEdges(q: Query, + stepIdx: Int, + queryRequests: Seq[QueryRequest], + queryResultLsFuture: Future[Seq[StepResult]], + queryParams: Seq[QueryParam], + alreadyVisited: Map[(LabelWithDirection, S2Vertex), Boolean] = Map.empty, + buildLastStepInnerResult: Boolean = true, + parentEdges: Map[VertexId, Seq[EdgeWithScore]]) + (implicit ec: scala.concurrent.ExecutionContext): Future[StepResult] = { + + queryResultLsFuture.map { queryRequestWithResultLs => + val (cursors, failCount) = { + val _cursors = ArrayBuffer.empty[Array[Byte]] + var _failCount = 0 + + queryRequestWithResultLs.foreach { stepResult => + _cursors.append(stepResult.cursors: _*) + _failCount += stepResult.failCount + } + + _cursors -> _failCount + } + + + if (queryRequestWithResultLs.isEmpty) StepResult.Empty.copy(failCount = failCount) + else { + val isLastStep = stepIdx == q.steps.size - 1 + val queryOption = q.queryOption + val step = q.steps(stepIdx) + + val currentStepResults = queryRequests.view.zip(queryRequestWithResultLs) + val shouldBuildInnerResults = !isLastStep || buildLastStepInnerResult + val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges) + + if (shouldBuildInnerResults) { + val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) => + edgeWithScore + } + + /** process step group by */ + val results = StepResult.filterOutStepGroupBy(_results, step.groupBy) + StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount) + + } else { + val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) => + val edge = edgeWithScore.edge + val score = edgeWithScore.score + val label = edgeWithScore.label + + /** Select */ + val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns) + +// val newEdge = edge.copy(propsWithTs = mergedPropsWithTs) + val newEdge = edge.copyEdgeWithState(mergedPropsWithTs) + + val newEdgeWithScore = edgeWithScore.copy(edge = newEdge) + /** OrderBy */ + val orderByValues = + if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None) + else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys)) + + /** StepGroupBy */ + val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys) + + /** GroupBy */ + val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys) + + /** FilterOut */ + val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields) + + newEdgeWithScore.copy(orderByValues = orderByValues, + stepGroupByValues = stepGroupByValues, + groupByValues = groupByValues, + filterOutValues = filterOutValues) + } + + /** process step group by */ + val results = StepResult.filterOutStepGroupBy(_results, step.groupBy) + + /** process ordered list */ + val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil + + /** process grouped list */ + val grouped = + if (queryOption.groupBy.keys.isEmpty) Nil + else { + val agg = new scala.collection.mutable.HashMap[StepResult.GroupByKey, (Double, StepResult.Values)]() + results.groupBy { edgeWithScore => + // edgeWithScore.groupByValues.map(_.map(_.toString)) + edgeWithScore.groupByValues + }.foreach { case (k, ls) => + val (scoreSum, merged) = StepResult.mergeOrdered(ls, Nil, queryOption) + + val newScoreSum = scoreSum + + /** + * watch out here. by calling toString on Any, we lose type information which will be used + * later for toJson. + */ + if (merged.nonEmpty) { + val newKey = merged.head.groupByValues + agg += (newKey -> (newScoreSum, merged)) + } + } + agg.toSeq.sortBy(_._2._1 * -1) + } + + StepResult(edgeWithScores = ordered, grouped = grouped, degreeEdges = degrees, cursors = cursors, failCount = failCount) + } + } + } + } + + private def toEdgeWithScores(queryRequest: QueryRequest, + stepResult: StepResult, + parentEdges: Map[VertexId, Seq[EdgeWithScore]]): Seq[EdgeWithScore] = { + val queryOption = queryRequest.query.queryOption + val queryParam = queryRequest.queryParam + val prevScore = queryRequest.prevStepScore + val labelWeight = queryRequest.labelWeight + val edgeWithScores = stepResult.edgeWithScores + + val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent + val parents = if (shouldBuildParents) { + parentEdges.getOrElse(queryRequest.vertex.id, Nil).map { edgeWithScore => + val edge = edgeWithScore.edge + val score = edgeWithScore.score + val label = edgeWithScore.label + + /** Select */ + val mergedPropsWithTs = + if (queryOption.selectColumns.isEmpty) { + edge.propertyValuesInner() + } else { + val initial = Map(LabelMeta.timestamp -> edge.propertyValueInner(LabelMeta.timestamp)) + edge.propertyValues(queryOption.selectColumns) ++ initial + } + + val newEdge = edge.copyEdgeWithState(mergedPropsWithTs) + edgeWithScore.copy(edge = newEdge) + } + } else Nil + + // skip + if (queryOption.ignorePrevStepCache) stepResult.edgeWithScores + else { + val degreeScore = 0.0 + + val sampled = + if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample) + else edgeWithScores + + val withScores = for { + edgeWithScore <- sampled + } yield { + val edge = edgeWithScore.edge + val edgeScore = edgeWithScore.score + val score = queryParam.scorePropagateOp match { + case "plus" => edgeScore + prevScore + case "divide" => + if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0 + else edgeScore / (prevScore + queryParam.scorePropagateShrinkage) + case _ => edgeScore * prevScore + } + + val tsVal = processTimeDecay(queryParam, edge) + val newScore = degreeScore + score + // val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge + val newEdge = edge.copy(parentEdges = parents) + edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal) + } + + val normalized = + if (queryParam.shouldNormalize) normalize(withScores) + else withScores + + normalized + } + } + + private def buildResult[T](query: Query, + stepIdx: Int, + stepResultLs: Seq[(QueryRequest, StepResult)], + parentEdges: Map[VertexId, Seq[EdgeWithScore]]) + (createFunc: (EdgeWithScore, Seq[LabelMeta]) => T) + (implicit ev: WithScore[T]): ListBuffer[T] = { + import scala.collection._ + + val results = ListBuffer.empty[T] + val sequentialLs: ListBuffer[(HashKey, FilterHashKey, T, QueryParam)] = ListBuffer.empty + val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, T)]] = mutable.HashMap.empty + val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty + val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty + + var numOfDuplicates = 0 + val queryOption = query.queryOption + val step = query.steps(stepIdx) + val excludeLabelWithDirSet = step.queryParams.filter(_.exclude).map(l => l.labelWithDir).toSet + val includeLabelWithDirSet = step.queryParams.filter(_.include).map(l => l.labelWithDir).toSet + + stepResultLs.foreach { case (queryRequest, stepInnerResult) => + val queryParam = queryRequest.queryParam + val label = queryParam.label + val shouldBeExcluded = excludeLabelWithDirSet.contains(queryParam.labelWithDir) + val shouldBeIncluded = includeLabelWithDirSet.contains(queryParam.labelWithDir) + + val propsSelectColumns = (for { + column <- queryOption.propsSelectColumns + labelMeta <- label.metaPropsInvMap.get(column) + } yield labelMeta) + + for { + edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges) + } { + val edge = edgeWithScore.edge + val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false) + // params += (hashKey -> queryParam) // + + /** check if this edge should be exlcuded. */ + if (shouldBeExcluded) { + edgesToExclude.add(filterHashKey) + } else { + if (shouldBeIncluded) { + edgesToInclude.add(filterHashKey) + } + val newEdgeWithScore = createFunc(edgeWithScore, propsSelectColumns) + + sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam)) + duplicates.get(hashKey) match { + case None => + val newLs = ListBuffer.empty[(FilterHashKey, T)] + newLs += (filterHashKey -> newEdgeWithScore) + duplicates += (hashKey -> newLs) // + case Some(old) => + numOfDuplicates += 1 + old += (filterHashKey -> newEdgeWithScore) // + } + } + } + } + + + if (numOfDuplicates == 0) { + // no duplicates at all. + for { + (hashKey, filterHashKey, edgeWithScore, _) <- sequentialLs + if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) + } { + results += edgeWithScore + } + } else { + // need to resolve duplicates. + val seen = new mutable.HashSet[HashKey]() + for { + (hashKey, filterHashKey, edgeWithScore, queryParam) <- sequentialLs + if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) + if !seen.contains(hashKey) + } { + // val queryParam = params(hashKey) + processDuplicates(queryParam, duplicates(hashKey)).foreach { case (_, duplicate) => + if (ev.score(duplicate) >= queryParam.threshold) { + seen += hashKey + results += duplicate + } + } + } + } + results + } + +} + +class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph { + + import S2Graph._ + + val config = _config.withFallback(S2Graph.DefaultConfig) + + Model.apply(config) + Model.loadCache() + + val MaxRetryNum = config.getInt("max.retry.number") + val MaxBackOff = config.getInt("max.back.off") + val BackoffTimeout = config.getInt("back.off.timeout") + val DeleteAllFetchCount = config.getInt("delete.all.fetch.count") + val DeleteAllFetchSize = config.getInt("delete.all.fetch.size") + val FailProb = config.getDouble("hbase.fail.prob") + val LockExpireDuration = config.getInt("lock.expire.time") + val MaxSize = config.getInt("future.cache.max.size") + val ExpireAfterWrite = config.getInt("future.cache.expire.after.write") + val ExpireAfterAccess = config.getInt("future.cache.expire.after.access") + val WaitTimeout = Duration(60, TimeUnit.SECONDS) + val scheduledEx = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()) + + private def confWithFallback(conf: Config): Config = { + conf.withFallback(config) + } + + /** + * TODO: we need to some way to handle malformed configuration for storage. + */ + val storagePool: scala.collection.mutable.Map[String, Storage[_, _]] = { + val labels = Label.findAll() + val services = Service.findAll() + + val labelConfigs = labels.flatMap(_.toStorageConfig) + val serviceConfigs = services.flatMap(_.toStorageConfig) + + val configs = (labelConfigs ++ serviceConfigs).map { conf => + confWithFallback(conf) + }.toSet + + val pools = new java.util.HashMap[Config, Storage[_, _]]() + configs.foreach { config => + pools.put(config, S2Graph.initStorage(this, config)(ec)) + } + + val m = new java.util.concurrent.ConcurrentHashMap[String, Storage[_, _]]() + + labels.foreach { label => + if (label.storageConfigOpt.isDefined) { + m += (s"label:${label.label}" -> pools(label.storageConfigOpt.get)) + } + } + + services.foreach { service => + if (service.storageConfigOpt.isDefined) { + m += (s"service:${service.serviceName}" -> pools(service.storageConfigOpt.get)) + } + } + + m + } + + val defaultStorage: Storage[_, _] = S2Graph.initStorage(this, config)(ec) + + /** QueryLevel FutureCache */ + val queryFutureCache = new DeferCache[StepResult, Promise, Future](parseCacheConfig(config, "query."), empty = StepResult.Empty) + + for { + entry <- config.entrySet() if S2Graph.DefaultConfigs.contains(entry.getKey) + (k, v) = (entry.getKey, entry.getValue) + } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}") + + def getStorage(service: Service): Storage[_, _] = { + storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage) + } + + def getStorage(label: Label): Storage[_, _] = { + storagePool.getOrElse(s"label:${label.label}", defaultStorage) + } + + def flushStorage(): Unit = { + storagePool.foreach { case (_, storage) => + + /** flush is blocking */ + storage.flush() + } + } + + def fallback = Future.successful(StepResult.Empty) + + def checkEdges(edges: Seq[S2Edge]): Future[StepResult] = { + val futures = for { + edge <- edges + } yield { + fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) => + edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, queryParam.label)) + } + } + + Future.sequence(futures).map { edgeWithScoreLs => + val edgeWithScores = edgeWithScoreLs.flatten + StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil) + } + } + + // def checkEdges(edges: Seq[Edge]): Future[StepResult] = storage.checkEdges(edges) + + def getEdges(q: Query): Future[StepResult] = { + Try { + if (q.steps.isEmpty) { + // TODO: this should be get vertex query. + fallback + } else { + val filterOutFuture = q.queryOption.filterOutQuery match { + case None => fallback + case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery) + } + for { + stepResult <- getEdgesStepInner(q) + filterOutInnerResult <- filterOutFuture + } yield { + if (filterOutInnerResult.isEmpty) stepResult + else StepResult.filterOut(this, q.queryOption, stepResult, filterOutInnerResult) + } + } + } recover { + case e: Exception => + logger.error(s"getEdgesAsync: $e", e) + fallback + } get + } + + def getEdgesStepInner(q: Query, buildLastStepInnerResult: Boolean = false): Future[StepResult] = { + Try { + if (q.steps.isEmpty) fallback + else { + + val queryOption = q.queryOption + def fetch: Future[StepResult] = { + val startStepInnerResult = QueryResult.fromVertices(this, q) + q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) => + for { + prevStepInnerResult <- prevStepInnerResultFuture + currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult, buildLastStepInnerResult) + } yield { + currentStepInnerResult.copy( + accumulatedCursors = prevStepInnerResult.accumulatedCursors :+ currentStepInnerResult.cursors, + failCount = currentStepInnerResult.failCount + prevStepInnerResult.failCount + ) + } + } + } + + fetch + } + } recover { + case e: Exception => + logger.error(s"getEdgesAsync: $e", e) + fallback + } get + } + + def fetchStep(orgQuery: Query, + stepIdx: Int, + stepInnerResult: StepResult, + buildLastStepInnerResult: Boolean = false): Future[StepResult] = { + if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty) + else { + val edgeWithScoreLs = stepInnerResult.edgeWithScores + + val q = orgQuery + val queryOption = orgQuery.queryOption + val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None + val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold) + val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1) + val step = q.steps(stepIdx) + + val alreadyVisited = + if (stepIdx == 0) Map.empty[(LabelWithDirection, S2Vertex), Boolean] + else alreadyVisitedVertices(stepInnerResult.edgeWithScores) + + val initial = (Map.empty[S2Vertex, Double], Map.empty[S2Vertex, ArrayBuffer[EdgeWithScore]]) + val (sums, grouped) = edgeWithScoreLs.foldLeft(initial) { case ((sum, group), edgeWithScore) => + val key = edgeWithScore.edge.tgtVertex + val newScore = sum.getOrElse(key, 0.0) + edgeWithScore.score + val buffer = group.getOrElse(key, ArrayBuffer.empty[EdgeWithScore]) + buffer += edgeWithScore + (sum + (key -> newScore), group + (key -> buffer)) + } + val groupedByFiltered = sums.filter(t => t._2 >= prevStepThreshold) + val prevStepTgtVertexIdEdges = grouped.map(t => t._1.id -> t._2) + + val nextStepSrcVertices = if (prevStepLimit >= 0) { + groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit) + } else { + groupedByFiltered.toSeq + } + + val queryRequests = for { + (vertex, prevStepScore) <- nextStepSrcVertices + queryParam <- step.queryParams + } yield { + val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0) + val newPrevStepScore = if (queryOption.shouldPropagateScore) prevStepScore else 1.0 + QueryRequest(q, stepIdx, vertex, queryParam, newPrevStepScore, labelWeight) + } + + val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges) + + filterEdges(orgQuery, stepIdx, queryRequests, + fetchedLs, orgQuery.steps(stepIdx).queryParams, alreadyVisited, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec) + } + } + + + /** + * responsible to fire parallel fetch call into storage and create future that will return merged result. + * + * @param queryRequests + * @param prevStepEdges + * @return + */ + def fetches(queryRequests: Seq[QueryRequest], + prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = { + + val reqWithIdxs = queryRequests.zipWithIndex + val requestsPerLabel = reqWithIdxs.groupBy(t => t._1.queryParam.label) + val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) => + for { + prev <- prevFuture + cur <- getStorage(label).fetches(reqWithIdxs.map(_._1), prevStepEdges) + } yield { + prev ++ reqWithIdxs.map(_._2).zip(cur).toMap + } + } + aggFuture.map { agg => agg.toSeq.sortBy(_._1).map(_._2) } + } + + + def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = { + Try { + if (mq.queries.isEmpty) fallback + else { + val filterOutFuture = mq.queryOption.filterOutQuery match { + case None => fallback + case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery) + } + + val multiQueryFutures = Future.sequence(mq.queries.map { query => getEdges(query) }) + for { + multiQueryResults <- multiQueryFutures + filterOutInnerResult <- filterOutFuture + } yield { + StepResult.merges(mq.queryOption, multiQueryResults, mq.weights, filterOutInnerResult) + } + } + } recover { + case e: Exception => + logger.error(s"getEdgesAsync: $e", e) + fallback + } get + } + + + def fetchSnapshotEdge(edge: S2Edge): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = { + /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache + * so use empty cacheKey. + * */ + val queryParam = QueryParam(labelName = edge.innerLabel.label, + direction = GraphUtil.fromDirection(edge.labelWithDir.dir), + tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal), + cacheTTLInMillis = -1) + val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam)) + val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam) + + val storage = getStorage(edge.innerLabel) + storage.fetchSnapshotEdgeKeyValues(queryRequest).map { kvs => + val (edgeOpt, kvOpt) = + if (kvs.isEmpty) (None, None) + else { + val snapshotEdgeOpt = storage.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil) + val _kvOpt = kvs.headOption + (snapshotEdgeOpt, _kvOpt) + } + (queryParam, edgeOpt, kvOpt) + } recoverWith { case ex: Throwable => + logger.error(s"fetchQueryParam failed. fallback return.", ex) + throw FetchTimeoutException(s"${edge.toLogString}") + } + } + + def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = { + val verticesWithIdx = vertices.zipWithIndex + val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) => + getStorage(service).getVertices(vertexGroup.map(_._1)).map(_.zip(vertexGroup.map(_._2))) + } + + Future.sequence(futures).map { ls => + ls.flatten.toSeq.sortBy(_._2).map(_._1) + } + } + + /** mutate */ + def deleteAllAdjacentEdges(srcVertices: Seq[S2Vertex], + labels: Seq[Label], + dir: Int, + ts: Long): Future[Boolean] = { + + val requestTs = ts + val vertices = srcVertices + /** create query per label */ + val queries = for { + label <- labels + } yield { + val queryParam = QueryParam(labelName = label.label, direction = GraphUtil.fromDirection(dir), + offset = 0, limit = DeleteAllFetchSize, duplicatePolicy = DuplicatePolicy.Raw) + val step = Step(List(queryParam)) + Query(vertices, Vector(step)) + } + + // Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) { + val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) { + fetchAndDeleteAll(queries, requestTs) + } { case (allDeleted, deleteSuccess) => + allDeleted && deleteSuccess + }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess } + + retryFuture onFailure { + case ex => + logger.error(s"[Error]: deleteAllAdjacentEdges failed.") + } + + retryFuture + } + + def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = { + val future = for { + stepInnerResultLs <- Future.sequence(queries.map(getEdgesStepInner(_, true))) + (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs) + } yield { + // logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}") + (allDeleted, ret) + } + + Extensions.retryOnFailure(MaxRetryNum) { + future + } { + logger.error(s"fetch and deleteAll failed.") + (true, false) + } + + } + + def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult], + requestTs: Long): Future[(Boolean, Boolean)] = { + stepInnerResultLs.foreach { stepInnerResult => + if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.") + } + val futures = for { + stepInnerResult <- stepInnerResultLs + deleteStepInnerResult = buildEdgesToDelete(stepInnerResult, requestTs) + if deleteStepInnerResult.edgeWithScores.nonEmpty + } yield { + val head = deleteStepInnerResult.edgeWithScores.head + val label = head.edge.innerLabel + val ret = label.schemaVersion match { + case HBaseType.VERSION3 | HBaseType.VERSION4 => + if (label.consistencyLevel == "strong") { + /** + * read: snapshotEdge on queryResult = O(N) + * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge)) + */ + mutateEdges(deleteStepInnerResult.edgeWithScores.map(_.edge), withWait = true).map(_.forall(identity)) + } else { + getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum) + } + case _ => + + /** + * read: x + * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices) + */ + getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum) + } + ret + } + + if (futures.isEmpty) { + // all deleted. + Future.successful(true -> true) + } else { + Future.sequence(futures).map { rets => false -> rets.forall(identity) } + } + } + + def buildEdgesToDelete(stepInnerResult: StepResult, requestTs: Long): StepResult = { + val filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore => + (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree + } + if (filtered.isEmpty) StepResult.Empty + else { + val head = filtered.head + val label = head.edge.innerLabel + val edgeWithScoreLs = filtered.map { edgeWithScore => + val edge = edgeWithScore.edge + val copiedEdge = label.consistencyLevel match { + case "strong" => + edge.copyEdge(op = GraphUtil.operations("delete"), + version = requestTs, propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs) + case _ => + edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs) + } +// val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match { +// case "strong" => +// val edge = edgeWithScore.edge +// edge.property(LabelMeta.timestamp.name, requestTs) +// val _newPropsWithTs = edge.updatePropsWithTs() +// +// (GraphUtil.operations("delete"), requestTs, _newPropsWithTs) +// case _ => +// val oldEdge = edgeWithScore.edge +// (oldEdge.op, oldEdge.version, oldEdge.updatePropsWithTs()) +// } +// +// val copiedEdge = +// edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs) + + val edgeToDelete = edgeWithScore.copy(edge = copiedEdge) + // logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}") + edgeToDelete + } + //Degree edge? + StepResult(edgeWithScores = edgeWithScoreLs, grouped = Nil, degreeEdges = Nil, false) + } + } + + // def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] = + // storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts) + + def mutateElements(elements: Seq[GraphElement], + withWait: Boolean = false): Future[Seq[Boolean]] = { + + val edgeBuffer = ArrayBuffer[(S2Edge, Int)]() + val vertexBuffer = ArrayBuffer[(S2Vertex, Int)]() + + elements.zipWithIndex.foreach { + case (e: S2Edge, idx: Int) => edgeBuffer.append((e, idx)) + case (v: S2Vertex, idx: Int) => vertexBuffer.append((v, idx)) + case any@_ => logger.error(s"Unknown type: ${any}") + } + + val edgeFutureWithIdx = mutateEdges(edgeBuffer.map(_._1), withWait).map { result => + edgeBuffer.map(_._2).zip(result) + } + + val vertexFutureWithIdx = mutateVertices(vertexBuffer.map(_._1), withWait).map { result => + vertexBuffer.map(_._2).zip(result) + } + + val graphFuture = for { + edgesMutated <- edgeFutureWithIdx + verticesMutated <- vertexFutureWithIdx + } yield (edgesMutated ++ verticesMutated).sortBy(_._1).map(_._2) + + graphFuture + + } + + // def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait) + + def mutateEdges(edges: Seq[S2Edge], withWait: Boolean = false): Future[Seq[Boolean]] = { + val edgeWithIdxs = edges.zipWithIndex + + val (strongEdges, weakEdges) = + edgeWithIdxs.partition { case (edge, idx) => + val e = edge + e.innerLabel.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk") + } + + val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) => + val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, edgeGroup) => + val storage = getStorage(label) + val edges = edgeGroup.map(_._1) + val idxs = edgeGroup.map(_._2) + + /** multiple edges with weak consistency level will be processed as batch */ + val mutations = edges.flatMap { edge => + val (_, edgeUpdate) = + if (edge.op == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge) + else S2Edge.buildOperation(None, Seq(edge)) + + storage.buildVertexPutsAsync(edge) ++ storage.indexedEdgeMutations(edgeUpdate) ++ storage.snapshotEdgeMutations(edgeUpdate) ++ storage.increments(edgeUpdate) + } + + storage.writeToStorage(zkQuorum, mutations, withWait).map { ret => + idxs.map(idx => idx -> ret) + } + } + Future.sequence(futures) + } + val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.op == GraphUtil.operations("deleteAll") } + + val deleteAllFutures = strongDeleteAll.map { case (edge, idx) => + deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.innerLabel), edge.labelWithDir.dir, edge.ts).map(idx -> _) + } + + val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) => + val edges = edgeGroup.map(_._1) + val idxs = edgeGroup.map(_._2) + val storage = getStorage(label) + storage.mutateStrongEdges(edges, withWait = true).map { rets => + idxs.zip(rets) + } + } + + for { + weak <- Future.sequence(weakEdgesFutures) + deleteAll <- Future.sequence(deleteAllFutures) + strong <- Future.sequence(strongEdgesFutures) + } yield { + (deleteAll ++ weak.flatten.flatten ++ strong.flatten).sortBy(_._1).map(_._2) + } + } + + def mutateVertices(vertices: Seq[S2Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = { + val verticesWithIdx = vertices.zipWithIndex + val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) => + getStorage(service).mutateVertices(vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2))) + } + Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) } + } + + def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = { + val edgesWithIdx = edges.zipWithIndex + val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) => + getStorage(label).incrementCounts(edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2))) + } + Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) } + } + + def updateDegree(edge: S2Edge, degreeVal: Long = 0): Future[Boolean] = { + val label = edge.innerLabel + + val storage = getStorage(label) + val kvs = storage.buildDegreePuts(edge, degreeVal) + + storage.writeToStorage(edge.innerLabel.service.cluster, kvs, withWait = true) + } + + def shutdown(): Unit = { + flushStorage() + Model.shutdown() + } + + def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try { + val parts = GraphUtil.split(s) + val logType = parts(2) + val element = if (logType == "edge" | logType == "e") { + /** current only edge is considered to be bulk loaded */ + labelMapping.get(parts(5)) match { + case None => + case Some(toReplace) => + parts(5) = toReplace + } + toEdge(parts) + } else if (logType == "vertex" | logType == "v") { + toVertex(parts) + } else { + throw new GraphExceptions.JsonParseException("log type is not exist in log.") + } + + element + } recover { + case e: Exception => + logger.error(s"[toElement]: $e", e) + None + } get + + + def toVertex(s: String): Option[S2Vertex] = { + toVertex(GraphUtil.split(s)) + } + + def toEdge(s: String): Option[S2Edge] = { + toEdge(GraphUtil.split(s)) + } + + def toEdge(parts: Array[String]): Option[S2Edge] = Try { + val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) + val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] + val tempDirection = if (parts.length >= 8) parts(7) else "out" + val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection + val edge = toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation) + Option(edge) + } recover { + case e: Exception => + logger.error(s"[toEdge]: $e", e) + throw e + } get + + def toVertex(parts: Array[String]): Option[S2Vertex] = Try { + val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) + val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] + val vertex = toVertex(serviceName, colName, srcId, props, ts.toLong, operation) + Option(vertex) + } recover { + case e: Throwable => + logger.error(s"[toVertex]: $e", e) + throw e + } get + + def toEdge(srcId: Any, + tgtId: Any, + labelName: String, + direction: String, + props: Map[String, Any] = Map.empty, + ts: Long = System.currentTimeMillis(), + operation: String = "insert"): S2Edge = { + val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) + + val srcVertexIdInnerVal = toInnerVal(srcId.toString, label.srcColumn.columnType, label.schemaVersion) + val tgtVertexIdInnerVal = toInnerVal(tgtId.toString, label.tgtColumn.columnType, label.schemaVersion) + + val srcVertex = newVertex(SourceVertexId(label.srcColumn, srcVertexIdInnerVal), System.currentTimeMillis()) + val tgtVertex = newVertex(TargetVertexId(label.tgtColumn, tgtVertexIdInnerVal), System.currentTimeMillis()) + val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported.")) + + val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts) + val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts) + val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) + + new S2Edge(this, srcVertex, tgtVertex, label, dir, op = op, version = ts).copyEdgeWithState(propsWithTs) + } + + def toVertex(serviceName: String, + columnName: String, + id: Any, + props: Map[String, Any] = Map.empty, + ts: Long = System.currentTimeMillis(), + operation: String = "insert"): S2Vertex = { + + val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found.")) + val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found.")) + val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) + + val srcVertexId = VertexId(column, toInnerVal(id.toString, column.columnType, column.schemaVersion)) + val propsInner = column.propsToInnerVals(props) ++ + Map(ColumnMeta.timestamp -> InnerVal.withLong(ts, column.schemaVersion)) + + val vertex = new S2Vertex(this, srcVertexId, ts, S2Vertex.EmptyProps, op) + S2Vertex.fillPropsWithTs(vertex, propsInner) + vertex + } + + /** + * helper to create new Edge instance from given parameters on memory(not actually stored in storage). + * + * Since we are using mutable map for property value(propsWithTs), + * we should make sure that reference for mutable map never be shared between multiple Edge instances. + * To guarantee this, we never create Edge directly, but rather use this helper which is available on S2Graph. + * + * Note that we are using following convention + * 1. `add*` for method that actually store instance into storage, + * 2. `new*` for method that only create instance on memory, but not store it into storage. + * + * @param srcVertex + * @param tgtVertex + * @param innerLabel + * @param dir + * @param op + * @param version + * @param propsWithTs + * @param parentEdges + * @param originalEdgeOpt + * @param pendingEdgeOpt + * @param statusCode + * @param lockTs + * @param tsInnerValOpt + * @return + */ + def newEdge(srcVertex: S2Vertex, + tgtVertex: S2Vertex, + innerLabel: Label, + dir: Int, + op: Byte = GraphUtil.defaultOpByte, + version: Long = System.currentTimeMillis(), + propsWithTs: S2Edge.State, + parentEdges: Seq[EdgeWithScore] = Nil, + originalEdgeOpt: Option[S2Edge] = None, + pendingEdgeOpt: Option[S2Edge] = None, + statusCode: Byte = 0, + lockTs: Option[Long] = None, + tsInnerValOpt: Option[InnerValLike] = None): S2Edge = { + val edge = new S2Edge(this, srcVertex, tgtVertex, innerLabel, dir, op, version, S2Edge.EmptyProps, + parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) + S2Edge.fillPropsWithTs(edge, propsWithTs) + edge + } + + /** + * helper to create new SnapshotEdge instance from given parameters on memory(not actually stored in storage). + * + * Note that this is only available to S2Graph, not structure.Graph so only internal code should use this method. + * @param srcVertex + * @param tgtVertex + * @param label + * @param dir + * @param op + * @param version + * @param propsWithTs + * @param pendingEdgeOpt + * @param statusCode + * @param lockTs + * @param tsInnerValOpt + * @return + */ + private[core] def newSnapshotEdge(srcVertex: S2Vertex, + tgtVertex: S2Vertex, + label: Label, + dir: Int, + op: Byte, + version: Long, + propsWithTs: S2Edge.State, + pendingEdgeOpt: Option[S2Edge], + statusCode: Byte = 0, + lockTs: Option[Long], + tsInnerValOpt: Option[InnerValLike] = None): SnapshotEdge = { + val snapshotEdge = new SnapshotEdge(this, srcVertex, tgtVertex, label, dir, op, version, S2Edge.EmptyProps, + pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) + S2Edge.fillPropsWithTs(snapshotEdge, propsWithTs) + snapshotEdge + } + + /** + * internal helper to actually store a single edge based on given peramters. + * + * Note that this is used from S2Vertex to implement blocking interface from Tp3. + * Once tp3 provide AsyncStep, then this can be changed to return Java's CompletableFuture. + * + * @param srcVertex + * @param tgtVertex + * @param labelName + * @param direction + * @param props + * @param ts + * @param operation + * @return + */ + private[core] def addEdgeInner(srcVertex: S2Vertex, + tgtVertex: S2Vertex, + labelName: String, + direction: String = "out", + props: Map[String, AnyRef] = Map.empty, + ts: Long = System.currentTimeMillis(), + operation: String = "insert"): S2Edge = { + Await.result(addEdgeInnerAsync(srcVertex, tgtVertex, labelName, direction, props, ts, operation), WaitTimeout) + } + + private[core] def addEdgeInnerAsync(srcVertex: S2Vertex, + tgtVertex: S2Vertex, + labelName: String, + direction: String = "out", + props: Map[String, AnyRef] = Map.empty, + ts: Long = System.currentTimeMillis(), + operation: String = "insert"): Future[S2Edge] = { + // Validations on input parameter + val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) + val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported.")) +// if (srcVertex.id.column != label.srcColumnWithDir(dir)) throw new RuntimeException(s"srcVertex's column[${srcVertex.id.column}] is not matched to label's srcColumn[${label.srcColumnWithDir(dir)}") +// if (tgtVertex.id.column != label.tgtColumnWithDir(dir)) throw new RuntimeException(s"tgtVertex's column[${tgtVertex.id.column}] is not matched to label's tgtColumn[${label.tgtColumnWithDir(dir)}") + + // Convert given Map[String, AnyRef] property into internal class. + val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts) + val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts) + val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) + + val edge = newEdge(srcVertex, tgtVertex, label, dir, op = op, version = ts, propsWithTs = propsWithTs) + // store edge into storage withWait option. + mutateEdges(Seq(edge), withWait = true).map { rets => + if (!rets.headOption.getOrElse(false)) throw new RuntimeException("add edge failed.") + else edge + } + } + + + def newVertexId(serviceName: String)(columnName: String)(id: Any): VertexId = { + val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found.")) + val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found.")) + newVertexId(service, column, id) + } + + /** + * helper to create S2Graph's internal VertexId instance with given parameters. + * @param service + * @param column + * @param id + * @return + */ + def newVertexId(service: Service, + column: ServiceColumn, + id: Any): VertexId = { + val innerVal = CanInnerValLike.anyToInnerValLike.toInnerVal(id)(column.schemaVersion) + new VertexId(column, innerVal) + } + + def newVertex(id: VertexId, + ts: Long = System.currentTimeMillis(), + props: S2Vertex.Props = S2Vertex.EmptyProps, + op: Byte = 0, + belongLabelIds: Seq[Int] = Seq.empty): S2Vertex = { + val vertex = new S2Vertex(this, id, ts, S2Vertex.EmptyProps, op, belongLabelIds) + S2Vertex.fillPropsWithTs(vertex, props) + vertex + } + + def getVertex(vertexId: VertexId): Option[S2Vertex] = { + val v = newVertex(vertexId) + Await.result(getVertices(Seq(v)).map { vertices => vertices.headOption }, WaitTimeout) + } + + def fetchEdges(vertex: S2Vertex, labelNames: Seq[String], direction: String = "out"): util.Iterator[Edge] = { + Await.result(fetchEdgesAsync(vertex, labelNames, direction), WaitTimeout) + } + + def fetchEdgesAsync(vertex: S2Vertex, labelNames: Seq[String], direction: String = "out"): Future[util.Iterator[Edge]] = { + val queryParams = labelNames.map { l => + QueryParam(labelName = l, direction = direction) + } + val query = Query.toQuery(Seq(vertex), queryParams) + getEdges(query).map { stepResult => + val ls = new util.ArrayList[Edge]() + stepResult.edgeWithScores.foreach(es => ls.add(es.edge)) + ls.iterator() + } + } + + override def vertices(vertexIds: AnyRef*): util.Iterator[structure.Vertex] = { + val vertices = for { + vertexId <- vertexIds if vertexId.isInstanceOf[VertexId] + } yield newVertex(vertexId.asInstanceOf[VertexId]) + + val future = getVertices(vertices).map { vs => + val ls = new util.ArrayList[structure.Vertex]() + ls.addAll(vs) + ls.iterator() + } + Await.result(future, WaitTimeout) + } + + override def tx(): Transaction = ??? + + override def edges(objects: AnyRef*): util.Iterator[structure.Edge] = ??? + + override def variables(): Variables = ??? + + override def configuration(): Configuration = ??? + + override def addVertex(kvs: AnyRef*): structure.Vertex = { + val kvsMap = ElementHelper.asMap(kvs: _*).asScala.toMap + val id = kvsMap.getOrElse(T.id.toString, throw new RuntimeException("T.id is required.")) + val serviceColumnNames = kvsMap.getOrElse(T.label.toString, throw new RuntimeException("ServiceName::ColumnName is required.")).toString + val names = serviceColumnNames.split(S2Vertex.VertexLabelDelimiter) + if (names.length != 2) throw new RuntimeException("malformed data on vertex label.") + val serviceName = names(0) + val columnName = names(1) + + val vertex = toVertex(serviceName, columnName, id, kvsMap) + val future = mutateVertices(Seq(vertex), withWait = true).map { vs => + if (vs.forall(identity)) vertex + else throw new RuntimeException("addVertex failed.") + } + Await.result(future, WaitTimeout) + } + + def addVertex(id: VertexId, + ts: Long = System.currentTimeMillis(), + props: S2Vertex.Props = S2Vertex.EmptyProps, + op: Byte = 0, + belongLabelIds: Seq[Int] = Seq.empty): S2Vertex = { + val vertex = newVertex(id, ts, props, op, belongLabelIds) + val future = mutateVertices(Seq(vertex), withWait = true).map { rets => + if (rets.forall(identity)) vertex + else throw new RuntimeException("addVertex failed.") + } + Await.result(future, WaitTimeout) + } + + override def close(): Unit = { + shutdown() + } + + override def compute[C <: GraphComputer](aClass: Class[C]): C = ??? + + override def compute(): GraphComputer = ??? +}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala index 46f5ecf..6a47e46 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala @@ -27,16 +27,18 @@ import org.apache.tinkerpop.gremlin.structure.{Property} import scala.util.hashing.MurmurHash3 -case class S2Property[V](element: Edge, +case class S2Property[V](element: S2Edge, labelMeta: LabelMeta, key: String, - value: V, + v: V, ts: Long) extends Property[V] { import CanInnerValLike._ lazy val innerVal = anyToInnerValLike.toInnerVal(value)(element.innerLabel.schemaVersion) lazy val innerValWithTs = InnerValLikeWithTs(innerVal, ts) + val value = castValue(v, labelMeta.dataType).asInstanceOf[V] + def bytes: Array[Byte] = { innerVal.bytes } @@ -64,4 +66,5 @@ case class S2Property[V](element: Edge, override def toString(): String = { Map("labelMeta" -> labelMeta.toString, "key" -> key, "value" -> value, "ts" -> ts).toString } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala new file mode 100644 index 0000000..7fd2ac4 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core + +import java.util +import java.util.function.{Consumer, BiConsumer} + +import org.apache.s2graph.core.S2Vertex.Props +import org.apache.s2graph.core.mysqls.{ColumnMeta, LabelMeta, Service, ServiceColumn} +import org.apache.s2graph.core.types._ +import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality +import org.apache.tinkerpop.gremlin.structure.util.ElementHelper +import org.apache.tinkerpop.gremlin.structure.{Direction, Vertex, Edge, VertexProperty} +import play.api.libs.json.Json +import scala.collection.JavaConverters._ + +case class S2Vertex(graph: S2Graph, + id: VertexId, + ts: Long = System.currentTimeMillis(), + props: Props = S2Vertex.EmptyProps, + op: Byte = 0, + belongLabelIds: Seq[Int] = Seq.empty) extends GraphElement with Vertex { + + val innerId = id.innerId + + val innerIdVal = innerId.value + + lazy val properties = for { + (k, v) <- props.asScala + } yield v.columnMeta.name -> v.value + + def schemaVer = serviceColumn.schemaVersion + + def serviceColumn = ServiceColumn.findById(id.colId) + + def columnName = serviceColumn.columnName + + lazy val service = Service.findById(serviceColumn.serviceId) + + lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.hTableName) + + def defaultProps = { + val default = S2Vertex.EmptyProps + val newProps = new S2VertexProperty(this, ColumnMeta.lastModifiedAtColumn, ColumnMeta.lastModifiedAtColumn.name, ts) + default.put(ColumnMeta.lastModifiedAtColumn.name, newProps) + default + } + + // lazy val kvs = Graph.client.vertexSerializer(this).toKeyValues + + /** TODO: make this as configurable */ + override def serviceName = service.serviceName + + override def isAsync = false + + override def queueKey = Seq(ts.toString, serviceName).mkString("|") + + override def queuePartitionKey = id.innerId.toString + + def propsWithName = for { + (k, v) <- props.asScala + } yield (v.columnMeta.name -> v.value.toString) + + override def hashCode() = { + val hash = id.hashCode() + // logger.debug(s"Vertex.hashCode: $this -> $hash") + hash + } + + override def equals(obj: Any) = { + obj match { + case otherVertex: S2Vertex => + val ret = id == otherVertex.id + // logger.debug(s"Vertex.equals: $this, $obj => $ret") + ret + case _ => false + } + } + + override def toString(): String = { + Map("id" -> id.toString(), "ts" -> ts, "props" -> "", "op" -> op, "belongLabelIds" -> belongLabelIds).toString() + } + + def toLogString(): String = { + val (serviceName, columnName) = + if (!id.storeColId) ("", "") + else (serviceColumn.service.serviceName, serviceColumn.columnName) + + if (propsWithName.nonEmpty) + Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName, Json.toJson(propsWithName)).mkString("\t") + else + Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName).mkString("\t") + } + + def copyVertexWithState(props: Props): S2Vertex = { + val newVertex = copy(props = S2Vertex.EmptyProps) + S2Vertex.fillPropsWithTs(newVertex, props) + newVertex + } + + override def vertices(direction: Direction, edgeLabels: String*): util.Iterator[Vertex] = { + val arr = new util.ArrayList[Vertex]() + edges(direction, edgeLabels: _*).forEachRemaining(new Consumer[Edge] { + override def accept(edge: Edge): Unit = { + direction match { + case Direction.OUT => arr.add(edge.inVertex()) + case Direction.IN => arr.add(edge.outVertex()) + case _ => + arr.add(edge.inVertex()) + arr.add(edge.outVertex()) + } + } + }) + arr.iterator() + } + + override def edges(direction: Direction, labelNames: String*): util.Iterator[Edge] = { + graph.fetchEdges(this, labelNames, direction.name()) + } + + override def property[V](cardinality: Cardinality, key: String, value: V, objects: AnyRef*): VertexProperty[V] = { + cardinality match { + case Cardinality.single => + val columnMeta = serviceColumn.metasInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Vertex.")) + val newProps = new S2VertexProperty[V](this, columnMeta, key, value) + props.put(key, newProps) + newProps + case _ => throw new RuntimeException("only single cardinality is supported.") + } + } + + override def addEdge(label: String, vertex: Vertex, kvs: AnyRef*): S2Edge = { + vertex match { + case otherV: S2Vertex => + val props = ElementHelper.asMap(kvs: _*).asScala.toMap + //TODO: direction, operation, _timestamp need to be reserved property key. + val direction = props.get("direction").getOrElse("out").toString + val ts = props.get(LabelMeta.timestamp.name).map(_.toString.toLong).getOrElse(System.currentTimeMillis()) + val operation = props.get("operation").map(_.toString).getOrElse("insert") + + graph.addEdgeInner(this, otherV, label, direction, props, ts, operation) + case _ => throw new RuntimeException("only S2Graph vertex can be used.") + } + } + + override def property[V](key: String): VertexProperty[V] = { + props.get(key).asInstanceOf[S2VertexProperty[V]] + } + + override def properties[V](keys: String*): util.Iterator[VertexProperty[V]] = { + val ls = for { + key <- keys + } yield { + property[V](key) + } + ls.iterator.asJava + } + + override def remove(): Unit = ??? + + override def label(): String = service.serviceName + S2Vertex.VertexLabelDelimiter + serviceColumn.columnName +} + +object S2Vertex { + + val VertexLabelDelimiter = "::" + + type Props = java.util.Map[String, S2VertexProperty[_]] + type State = Map[ColumnMeta, InnerValLike] + def EmptyProps = new java.util.HashMap[String, S2VertexProperty[_]]() + def EmptyState = Map.empty[ColumnMeta, InnerValLike] + + def toPropKey(labelId: Int): Int = Byte.MaxValue + labelId + + def toLabelId(propKey: Int): Int = propKey - Byte.MaxValue + + def isLabelId(propKey: Int): Boolean = propKey > Byte.MaxValue + + def fillPropsWithTs(vertex: S2Vertex, props: Props): Unit = { + props.forEach(new BiConsumer[String, S2VertexProperty[_]] { + override def accept(key: String, p: S2VertexProperty[_]): Unit = { + vertex.property(Cardinality.single, key, p.value) + } + }) + } + + def fillPropsWithTs(vertex: S2Vertex, state: State): Unit = { + state.foreach { case (k, v) => vertex.property(Cardinality.single, k.name, v.value) } + } + + def propsToState(props: Props): State = { + props.asScala.map { case (k, v) => + v.columnMeta -> v.innerVal + }.toMap + } + + def stateToProps(vertex: S2Vertex, state: State): Props = { + state.foreach { case (k, v) => + vertex.property(k.name, v.value) + } + vertex.props + } + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala index 0f9f87b..9f8c682 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala @@ -25,23 +25,44 @@ import org.apache.s2graph.core.mysqls.ColumnMeta import org.apache.s2graph.core.types.CanInnerValLike import org.apache.tinkerpop.gremlin.structure.{Property, VertexProperty, Vertex => TpVertex} -case class S2VertexProperty[V](element: TpVertex, +import scala.util.hashing.MurmurHash3 + +case class S2VertexProperty[V](element: S2Vertex, columnMeta: ColumnMeta, key: String, - value: V) extends VertexProperty[V] { - implicit val encodingVer = columnMeta.serviceColumn.schemaVersion - val innerVal = CanInnerValLike.anyToInnerValLike.toInnerVal(value) + v: V) extends VertexProperty[V] { + import CanInnerValLike._ + implicit lazy val encodingVer = element.serviceColumn.schemaVersion + lazy val innerVal = CanInnerValLike.anyToInnerValLike.toInnerVal(value) def toBytes: Array[Byte] = { innerVal.bytes } + val value = castValue(v, columnMeta.dataType).asInstanceOf[V] + override def properties[U](strings: String*): util.Iterator[Property[U]] = ??? - override def property[V](s: String, v: V): Property[V] = ??? + override def property[V](key: String, value: V): Property[V] = ??? override def remove(): Unit = ??? override def id(): AnyRef = ??? override def isPresent: Boolean = ??? + + override def hashCode(): Int = { + MurmurHash3.stringHash(columnMeta.columnId + "," + columnMeta.id.get + "," + key + "," + value) + } + + override def equals(other: Any): Boolean = other match { + case p: S2VertexProperty[_] => + columnMeta.columnId == p.columnMeta.columnId && + columnMeta.seq == p.columnMeta.seq && + key == p.key && value == p.value + case _ => false + } + + override def toString(): String = { + Map("columnMeta" -> columnMeta.toString, "key" -> key, "value" -> value).toString + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala deleted file mode 100644 index 57c9824..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core - -import java.util - -import org.apache.s2graph.core.JSONParser._ -import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn} -import org.apache.s2graph.core.types.{InnerVal, InnerValLike, SourceVertexId, VertexId} -import org.apache.tinkerpop.gremlin.structure -import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality -import org.apache.tinkerpop.gremlin.structure.{Vertex => TpVertex, Direction, Edge, VertexProperty, Graph} -import play.api.libs.json.Json - -case class Vertex(graph: Graph, - id: VertexId, - ts: Long = System.currentTimeMillis(), - props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike], - op: Byte = 0, - belongLabelIds: Seq[Int] = Seq.empty) extends GraphElement with TpVertex { - - val innerId = id.innerId - - val innerIdVal = innerId.value - - lazy val properties = for { - (k, v) <- props - meta <- serviceColumn.metasMap.get(k) - } yield meta.name -> v.value - - def schemaVer = serviceColumn.schemaVersion - - def serviceColumn = ServiceColumn.findById(id.colId) - - def columnName = serviceColumn.columnName - - def service = Service.findById(serviceColumn.serviceId) - - lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.hTableName) - - def defaultProps = Map(ColumnMeta.lastModifiedAtColumnSeq.toInt -> InnerVal.withLong(ts, schemaVer)) - - // lazy val kvs = Graph.client.vertexSerializer(this).toKeyValues - - /** TODO: make this as configurable */ - override def serviceName = service.serviceName - - override def isAsync = false - - override def queueKey = Seq(ts.toString, serviceName).mkString("|") - - override def queuePartitionKey = id.innerId.toString - - def propsWithName = for { - (seq, v) <- props - meta <- ColumnMeta.findByIdAndSeq(id.colId, seq.toByte) - } yield (meta.name -> v.toString) - - def toEdgeVertex() = graph.newVertex(SourceVertexId(id.column, innerId), ts, props, op) - - - override def hashCode() = { - val hash = id.hashCode() - // logger.debug(s"Vertex.hashCode: $this -> $hash") - hash - } - - override def equals(obj: Any) = { - obj match { - case otherVertex: Vertex => - val ret = id == otherVertex.id - // logger.debug(s"Vertex.equals: $this, $obj => $ret") - ret - case _ => false - } - } - - def withProps(newProps: Map[Int, InnerValLike]) = graph.newVertex(id, ts, newProps, op) - - def toLogString(): String = { - val (serviceName, columnName) = - if (!id.storeColId) ("", "") - else (serviceColumn.service.serviceName, serviceColumn.columnName) - - if (propsWithName.nonEmpty) - Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName, Json.toJson(propsWithName)).mkString("\t") - else - Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName).mkString("\t") - } - - override def vertices(direction: Direction, strings: String*): util.Iterator[TpVertex] = ??? - - override def edges(direction: Direction, strings: String*): util.Iterator[structure.Edge] = ??? - - override def property[V](cardinality: Cardinality, s: String, v: V, objects: AnyRef*): VertexProperty[V] = ??? - - override def addEdge(s: String, vertex: TpVertex, objects: AnyRef*): Edge = ??? - - override def properties[V](strings: String*): util.Iterator[VertexProperty[V]] = ??? - - override def remove(): Unit = ??? - - override def label(): String = ??? -} - -object Vertex { - - def toPropKey(labelId: Int): Int = Byte.MaxValue + labelId - - def toLabelId(propKey: Int): Int = propKey - Byte.MaxValue - - def isLabelId(propKey: Int): Boolean = propKey > Byte.MaxValue - - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala index f6c174d..09d02d1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala @@ -30,6 +30,8 @@ object ColumnMeta extends Model[ColumnMeta] { val lastModifiedAtColumn = ColumnMeta(Some(0), 0, "lastModifiedAt", lastModifiedAtColumnSeq, "long") val maxValue = Byte.MaxValue + val timestamp = ColumnMeta(None, -1, "_timestamp", timeStampSeq, "long") + def apply(rs: WrappedResultSet): ColumnMeta = { ColumnMeta(Some(rs.int("id")), rs.int("column_id"), rs.string("name"), rs.byte("seq"), rs.string("data_type").toLowerCase()) } @@ -125,6 +127,14 @@ object ColumnMeta extends Model[ColumnMeta] { } case class ColumnMeta(id: Option[Int], columnId: Int, name: String, seq: Byte, dataType: String) { - lazy val serviceColumn = ServiceColumn.findById(columnId) lazy val toJson = Json.obj("name" -> name, "dataType" -> dataType) + override def equals(other: Any): Boolean = { + if (!other.isInstanceOf[ColumnMeta]) false + else { + val o = other.asInstanceOf[ColumnMeta] + // labelId == o.labelId && + seq == o.seq + } + } + override def hashCode(): Int = seq.toInt } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala index 6636649..4a7e931 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala @@ -191,7 +191,6 @@ case class LabelMeta(id: Option[Int], seq: Byte, defaultValue: String, dataType: String) { - lazy val label = Label.findById(labelId) lazy val toJson = Json.obj("name" -> name, "defaultValue" -> defaultValue, "dataType" -> dataType) override def equals(other: Any): Boolean = { if (!other.isInstanceOf[LabelMeta]) false http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala index 85b6929..ebbbf88 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala @@ -29,12 +29,19 @@ import org.apache.s2graph.core.types.{HBaseType, InnerValLikeWithTs, InnerValLik import play.api.libs.json.Json import scalikejdbc._ object ServiceColumn extends Model[ServiceColumn] { - val Default = ServiceColumn(Option(HBaseType.DEFAULT_COL_ID), 0, "default", "string", "v4") + val Default = ServiceColumn(Option(0), -1, "default", "string", "v4") def apply(rs: WrappedResultSet): ServiceColumn = { ServiceColumn(rs.intOpt("id"), rs.int("service_id"), rs.string("column_name"), rs.string("column_type").toLowerCase(), rs.string("schema_version")) } +// def findByServiceAndColumn(serviceName: String, +// columnName: String, +// useCache: Boolean = true)(implicit session: DBSession): Option[ServiceColumn] = { +// val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found.")) +// find(service.id.get, columnName, useCache) +// } + def findById(id: Int)(implicit session: DBSession = AutoSession): ServiceColumn = { // val cacheKey = s"id=$id" val cacheKey = "id=" + id @@ -95,18 +102,18 @@ object ServiceColumn extends Model[ServiceColumn] { case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String, columnType: String, schemaVersion: String) { lazy val service = Service.findById(serviceId) - lazy val metas = ColumnMeta.findAllByColumn(id.get) + lazy val metas = ColumnMeta.timestamp +: ColumnMeta.findAllByColumn(id.get) :+ ColumnMeta.lastModifiedAtColumn lazy val metasMap = metas.map { meta => meta.seq.toInt -> meta } toMap lazy val metasInvMap = metas.map { meta => meta.name -> meta} toMap lazy val metaNamesMap = (ColumnMeta.lastModifiedAtColumn :: metas).map(x => (x.seq.toInt, x.name)) toMap lazy val toJson = Json.obj("serviceName" -> service.serviceName, "columnName" -> columnName, "columnType" -> columnType) - def propsToInnerVals(props: Map[String, Any]): Map[Int, InnerValLike] = { + def propsToInnerVals(props: Map[String, Any]): Map[ColumnMeta, InnerValLike] = { for { (k, v) <- props labelMeta <- metasInvMap.get(k) innerVal = toInnerVal(v.toString, labelMeta.dataType, schemaVersion) - } yield labelMeta.seq.toInt -> innerVal + } yield labelMeta -> innerVal } def innerValsToProps(props: Map[Int, InnerValLike]): Map[String, Any] = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala index aa018a9..d754bb7 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala @@ -22,7 +22,7 @@ package org.apache.s2graph.core.parsers import org.apache.s2graph.core.GraphExceptions.{LabelNotExistException, WhereParserException} import org.apache.s2graph.core.mysqls.{Label, LabelMeta} import org.apache.s2graph.core.types.InnerValLike -import org.apache.s2graph.core.{Edge, GraphUtil} +import org.apache.s2graph.core.{S2Edge, GraphUtil} import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core.utils.logger @@ -33,7 +33,7 @@ import scala.util.parsing.combinator.JavaTokenParsers trait ExtractValue { val parent = "_parent." - def propToInnerVal(edge: Edge, key: String) = { + def propToInnerVal(edge: S2Edge, key: String) = { val (propKey, parentEdge) = findParentEdge(edge, key) val label = parentEdge.innerLabel @@ -47,7 +47,7 @@ trait ExtractValue { } } - def valueToCompare(edge: Edge, key: String, value: String) = { + def valueToCompare(edge: S2Edge, key: String, value: String) = { val label = edge.innerLabel if (value.startsWith(parent) || label.metaPropsInvMap.contains(value)) propToInnerVal(edge, value) else { @@ -65,11 +65,11 @@ trait ExtractValue { } @tailrec - private def findParent(edge: Edge, depth: Int): Edge = + private def findParent(edge: S2Edge, depth: Int): S2Edge = if (depth > 0) findParent(edge.parentEdges.head.edge, depth - 1) else edge - private def findParentEdge(edge: Edge, key: String): (String, Edge) = { + private def findParentEdge(edge: S2Edge, key: String): (String, S2Edge) = { if (!key.startsWith(parent)) (key, edge) else { val split = key.split(parent) @@ -88,9 +88,9 @@ trait Clause extends ExtractValue { def or(otherField: Clause): Clause = Or(this, otherField) - def filter(edge: Edge): Boolean + def filter(edge: S2Edge): Boolean - def binaryOp(binOp: (InnerValLike, InnerValLike) => Boolean)(propKey: String, value: String)(edge: Edge): Boolean = { + def binaryOp(binOp: (InnerValLike, InnerValLike) => Boolean)(propKey: String, value: String)(edge: S2Edge): Boolean = { val propValue = propToInnerVal(edge, propKey) val compValue = valueToCompare(edge, propKey, value) @@ -105,20 +105,20 @@ object Where { } } case class Where(clauses: Seq[Clause] = Seq.empty[Clause]) { - def filter(edge: Edge) = + def filter(edge: S2Edge) = if (clauses.isEmpty) true else clauses.map(_.filter(edge)).forall(identity) } case class Gt(propKey: String, value: String) extends Clause { - override def filter(edge: Edge): Boolean = binaryOp(_ > _)(propKey, value)(edge) + override def filter(edge: S2Edge): Boolean = binaryOp(_ > _)(propKey, value)(edge) } case class Lt(propKey: String, value: String) extends Clause { - override def filter(edge: Edge): Boolean = binaryOp(_ < _)(propKey, value)(edge) + override def filter(edge: S2Edge): Boolean = binaryOp(_ < _)(propKey, value)(edge) } case class Eq(propKey: String, value: String) extends Clause { - override def filter(edge: Edge): Boolean = binaryOp(_ == _)(propKey, value)(edge) + override def filter(edge: S2Edge): Boolean = binaryOp(_ == _)(propKey, value)(edge) } case class InWithoutParent(label: Label, propKey: String, values: Set[String]) extends Clause { @@ -144,7 +144,7 @@ case class InWithoutParent(label: Label, propKey: String, values: Set[String]) e toInnerVal(value, dataType, label.schemaVersion) } - override def filter(edge: Edge): Boolean = { + override def filter(edge: S2Edge): Boolean = { if (edge.dir == GraphUtil.directions("in")) { val propVal = propToInnerVal(edge, propKey) innerValLikeLsIn.contains(propVal) @@ -156,7 +156,7 @@ case class InWithoutParent(label: Label, propKey: String, values: Set[String]) e } case class IN(propKey: String, values: Set[String]) extends Clause { - override def filter(edge: Edge): Boolean = { + override def filter(edge: S2Edge): Boolean = { val propVal = propToInnerVal(edge, propKey) values.exists { value => valueToCompare(edge, propKey, value) == propVal @@ -165,7 +165,7 @@ case class IN(propKey: String, values: Set[String]) extends Clause { } case class Between(propKey: String, minValue: String, maxValue: String) extends Clause { - override def filter(edge: Edge): Boolean = { + override def filter(edge: S2Edge): Boolean = { val propVal = propToInnerVal(edge, propKey) val minVal = valueToCompare(edge, propKey, minValue) val maxVal = valueToCompare(edge, propKey, maxValue) @@ -175,15 +175,15 @@ case class Between(propKey: String, minValue: String, maxValue: String) extends } case class Not(self: Clause) extends Clause { - override def filter(edge: Edge) = !self.filter(edge) + override def filter(edge: S2Edge) = !self.filter(edge) } case class And(left: Clause, right: Clause) extends Clause { - override def filter(edge: Edge) = left.filter(edge) && right.filter(edge) + override def filter(edge: S2Edge) = left.filter(edge) && right.filter(edge) } case class Or(left: Clause, right: Clause) extends Clause { - override def filter(edge: Edge) = left.filter(edge) || right.filter(edge) + override def filter(edge: S2Edge) = left.filter(edge) || right.filter(edge) } object WhereParser { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala index 13e02a0..62d1e40 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala @@ -118,7 +118,7 @@ object RequestParser { } -class RequestParser(graph: Graph) { +class RequestParser(graph: S2Graph) { import Management.JsonModel._ import RequestParser._ @@ -261,7 +261,7 @@ class RequestParser(graph: Graph) { GroupBy(keys) }.getOrElse(GroupBy.Empty) - def toVertices(labelName: String, direction: String, ids: Seq[JsValue]): Seq[Vertex] = { + def toVertices(labelName: String, direction: String, ids: Seq[JsValue]): Seq[S2Vertex] = { val vertices = for { label <- Label.findByName(labelName).toSeq serviceColumn = if (direction == "out") label.srcColumn else label.tgtColumn @@ -547,12 +547,12 @@ class RequestParser(graph: Graph) { elementsWithTsv } - def parseJsonFormat(jsValue: JsValue, operation: String): Seq[(Edge, String)] = { + def parseJsonFormat(jsValue: JsValue, operation: String): Seq[(S2Edge, String)] = { val jsValues = toJsValues(jsValue) jsValues.flatMap(toEdgeWithTsv(_, operation)) } - private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(Edge, String)] = { + private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(S2Edge, String)] = { val srcIds = (jsValue \ "from").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "froms").asOpt[Seq[JsValue]].getOrElse(Nil) val tgtIds = (jsValue \ "to").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "tos").asOpt[Seq[JsValue]].getOrElse(Nil) @@ -580,7 +580,7 @@ class RequestParser(graph: Graph) { toJsValues(jsValue).map(toVertex(_, operation, serviceName, columnName)) } - def toVertex(jsValue: JsValue, operation: String, serviceName: Option[String] = None, columnName: Option[String] = None): Vertex = { + def toVertex(jsValue: JsValue, operation: String, serviceName: Option[String] = None, columnName: Option[String] = None): S2Vertex = { val id = parse[JsValue](jsValue, "id") val ts = parseOption[Long](jsValue, "timestamp").getOrElse(System.currentTimeMillis()) val sName = if (serviceName.isEmpty) parse[String](jsValue, "serviceName") else serviceName.get
