http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala index ef40688..b481880 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala @@ -21,48 +21,46 @@ package org.apache.s2graph.core import com.google.common.hash.Hashing import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.DuplicatePolicy.DuplicatePolicy import org.apache.s2graph.core.GraphExceptions.LabelNotExistException import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} import org.apache.s2graph.core.parsers.{Where, WhereParser} -import org.apache.s2graph.core.types.{HBaseSerializable, InnerVal, InnerValLike, LabelWithDirection} +import org.apache.s2graph.core.rest.TemplateHelper +import org.apache.s2graph.core.storage.StorageSerializable._ +import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs, LabelWithDirection} import org.hbase.async.ColumnRangeFilter -import play.api.libs.json.{JsNull, JsNumber, JsValue, Json} +import play.api.libs.json.{JsString, JsNull, JsValue, Json} -import scala.util.hashing.MurmurHash3 import scala.util.{Success, Try} object Query { val initialScore = 1.0 lazy val empty = Query() - + def apply(query: Query): Query = { + Query(query.vertices, query.steps, query.queryOption, query.jsonQuery) + } def toQuery(srcVertices: Seq[Vertex], queryParam: QueryParam) = Query(srcVertices, Vector(Step(List(queryParam)))) - object DuplicatePolicy extends Enumeration { - type DuplicatePolicy = Value - val First, Sum, CountSum, Raw = Value - - def apply(policy: String): Value = { - policy match { - case "sum" => Query.DuplicatePolicy.Sum - case "countSum" => Query.DuplicatePolicy.CountSum - case "raw" => Query.DuplicatePolicy.Raw - case _ => DuplicatePolicy.First - } - } - } } +case class MinShouldMatchParam(prop: String, count: Int, terms: Set[Any]) + object GroupBy { val Empty = GroupBy() } case class GroupBy(keys: Seq[String] = Nil, - limit: Int = Int.MaxValue) + limit: Int = Int.MaxValue, + minShouldMatch: Option[MinShouldMatchParam]= None) case class MultiQuery(queries: Seq[Query], weights: Seq[Double], queryOption: QueryOption, jsonQuery: JsValue = JsNull) +object QueryOption { + val DefaultAscendingVals: Seq[Boolean] = Seq(false, false) +} + case class QueryOption(removeCycle: Boolean = false, selectColumns: Seq[String] = Seq.empty, groupBy: GroupBy = GroupBy.Empty, @@ -75,9 +73,27 @@ case class QueryOption(removeCycle: Boolean = false, returnAgg: Boolean = true, scoreThreshold: Double = Double.MinValue, returnDegree: Boolean = true, - impIdOpt: Option[String] = None) { + impIdOpt: Option[String] = None, + shouldPropagateScore: Boolean = true, + ignorePrevStepCache: Boolean = false) { val orderByKeys = orderByColumns.map(_._1) val ascendingVals = orderByColumns.map(_._2) + val selectColumnsMap = selectColumns.map { c => c -> true } .toMap + val scoreFieldIdx = orderByKeys.zipWithIndex.find(t => t._1 == "score").map(_._2).getOrElse(-1) + val (edgeSelectColumns, propsSelectColumns) = selectColumns.partition(c => LabelMeta.defaultRequiredMetaNames.contains(c)) + /** */ + val edgeSelectColumnsFiltered = edgeSelectColumns +// val edgeSelectColumnsFiltered = edgeSelectColumns.filterNot(c => groupBy.keys.contains(c)) + lazy val cacheKeyBytes: Array[Byte] = { + val selectBytes = Bytes.toBytes(selectColumns.toString) + val groupBytes = Bytes.toBytes(groupBy.keys.toString) + val orderByBytes = Bytes.toBytes(orderByColumns.toString) + val filterOutBytes = filterOutQuery.map(_.fullCacheBytes).getOrElse(Array.empty[Byte]) + val returnTreeBytes = Bytes.toBytes(returnTree) + + Seq(selectBytes, groupBytes, orderByBytes, filterOutBytes, returnTreeBytes).foldLeft(Array.empty[Byte])(Bytes.add) + } + } case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex], @@ -85,57 +101,13 @@ case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex], queryOption: QueryOption = QueryOption(), jsonQuery: JsValue = JsNull) { - val removeCycle = queryOption.removeCycle - val selectColumns = queryOption.selectColumns - val groupBy = queryOption.groupBy - val orderByColumns = queryOption.orderByColumns - val filterOutQuery = queryOption.filterOutQuery - val filterOutFields = queryOption.filterOutFields - val withScore = queryOption.withScore - val returnTree = queryOption.returnTree - val limitOpt = queryOption.limitOpt - val returnAgg = queryOption.returnAgg - val returnDegree = queryOption.returnDegree - - def cacheKeyBytes: Array[Byte] = { - val selectBytes = Bytes.toBytes(queryOption.selectColumns.toString) - val groupBytes = Bytes.toBytes(queryOption.groupBy.keys.toString) - val orderByBytes = Bytes.toBytes(queryOption.orderByColumns.toString) - val filterOutBytes = queryOption.filterOutQuery.map(_.cacheKeyBytes).getOrElse(Array.empty[Byte]) - val returnTreeBytes = Bytes.toBytes(queryOption.returnTree) - - Seq(selectBytes, groupBytes, orderByBytes, filterOutBytes, returnTreeBytes).foldLeft(Array.empty[Byte])(Bytes.add) - } - - lazy val selectColumnsSet = queryOption.selectColumns.map { c => - if (c == "_from") "from" - else if (c == "_to") "to" - else c - }.toSet - - /** return logical query id without considering parameter values */ - def templateId(): JsValue = { - Json.toJson(for { - step <- steps - queryParam <- step.queryParams.sortBy(_.labelWithDir.labelId) - } yield { - Json.obj("label" -> queryParam.label.label, "direction" -> GraphUtil.fromDirection(queryParam.labelWithDir.dir)) - }) - } - - def impressionId(): JsNumber = { - val hash = MurmurHash3.stringHash(templateId().toString()) - JsNumber(hash) - } - - def cursorStrings(): Seq[Seq[String]] = { - //Don`t know how to replace all cursor keys in json - steps.map { step => - step.queryParams.map { queryParam => - queryParam.cursorOpt.getOrElse("") - } - } + lazy val fullCacheBytes = { + val srcBytes = vertices.map(_.innerId.bytes).foldLeft(Array.empty[Byte])(Bytes.add) + val stepBytes = steps.map(_.cacheKeyBytes).foldLeft(Array.empty[Byte])(Bytes.add) + val queryOptionBytes = queryOption.cacheKeyBytes + Bytes.add(srcBytes, stepBytes, queryOptionBytes) } + lazy val fullCacheKey: Long = Hashing.murmur3_128().hashBytes(fullCacheBytes).asLong() } object EdgeTransformer { @@ -148,7 +120,7 @@ object EdgeTransformer { * TODO: step wise outputFields should be used with nextStepLimit, nextStepThreshold. * @param jsValue */ -case class EdgeTransformer(queryParam: QueryParam, jsValue: JsValue) { +case class EdgeTransformer(jsValue: JsValue) { val Delimiter = "\\$" val targets = jsValue.asOpt[List[Vector[String]]].toList val fieldsLs = for { @@ -159,7 +131,8 @@ case class EdgeTransformer(queryParam: QueryParam, jsValue: JsValue) { def toHashKeyBytes: Array[Byte] = if (isDefault) Array.empty[Byte] else Bytes.toBytes(jsValue.toString) - def replace(fmt: String, + def replace(queryParam: QueryParam, + fmt: String, values: Seq[InnerValLike], nextStepOpt: Option[Step]): Seq[InnerValLike] = { @@ -189,19 +162,19 @@ case class EdgeTransformer(queryParam: QueryParam, jsValue: JsValue) { } } - def toInnerValOpt(edge: Edge, fieldName: String): Option[InnerValLike] = { + def toInnerValOpt(queryParam: QueryParam, edge: Edge, fieldName: String): Option[InnerValLike] = { fieldName match { case LabelMeta.to.name => Option(edge.tgtVertex.innerId) case LabelMeta.from.name => Option(edge.srcVertex.innerId) case _ => for { labelMeta <- queryParam.label.metaPropsInvMap.get(fieldName) - value <- edge.propsWithTs.get(labelMeta.seq) + value <- edge.propsWithTs.get(labelMeta) } yield value.innerVal } } - def transform(edge: Edge, nextStepOpt: Option[Step]): Seq[Edge] = { + def transform(queryParam: QueryParam, edge: Edge, nextStepOpt: Option[Step]): Seq[Edge] = { if (isDefault) Seq(edge) else { val edges = for { @@ -209,10 +182,10 @@ case class EdgeTransformer(queryParam: QueryParam, jsValue: JsValue) { innerVal <- { if (fields.size == 1) { val fieldName = fields.head - toInnerValOpt(edge, fieldName).toSeq + toInnerValOpt(queryParam, edge, fieldName).toSeq } else { val fmt +: fieldNames = fields - replace(fmt, fieldNames.flatMap(fieldName => toInnerValOpt(edge, fieldName)), nextStepOpt) + replace(queryParam, fmt, fieldNames.flatMap(fieldName => toInnerValOpt(queryParam, edge, fieldName)), nextStepOpt) } } } yield edge.updateTgtVertex(innerVal).copy(originalEdgeOpt = Option(edge)) @@ -227,17 +200,18 @@ object Step { val Delimiter = "|" } -case class Step(queryParams: List[QueryParam], +case class Step(queryParams: Seq[QueryParam], labelWeights: Map[Int, Double] = Map.empty, - // scoreThreshold: Double = 0.0, nextStepScoreThreshold: Double = 0.0, nextStepLimit: Int = -1, - cacheTTL: Long = -1) { + cacheTTL: Long = -1, + groupBy: GroupBy = GroupBy.Empty) { - lazy val excludes = queryParams.filter(_.exclude) - lazy val includes = queryParams.filterNot(_.exclude) - lazy val excludeIds = excludes.map(x => x.labelWithDir.labelId -> true).toMap +// lazy val excludes = queryParams.filter(_.exclude) +// lazy val includes = queryParams.filterNot(_.exclude) +// lazy val excludeIds = excludes.map(x => x.labelWithDir.labelId -> true).toMap + lazy val cacheKeyBytes = queryParams.map(_.toCacheKeyRaw(Array.empty[Byte])).foldLeft(Array.empty[Byte])(Bytes.add) def toCacheKey(lss: Seq[Long]): Long = Hashing.murmur3_128().hashBytes(toCacheKeyRaw(lss)).asLong() // MurmurHash3.bytesHash(toCacheKeyRaw(lss)) @@ -265,359 +239,202 @@ case class VertexParam(vertices: Seq[Vertex]) { } -//object RankParam { -// def apply(labelId: Int, keyAndWeights: Seq[(Byte, Double)]) = { -// new RankParam(labelId, keyAndWeights) -// } -//} +object RankParam { + val Default = RankParam() +} -case class RankParam(labelId: Int, var keySeqAndWeights: Seq[(Byte, Double)] = Seq.empty[(Byte, Double)]) { +case class RankParam(keySeqAndWeights: Seq[(LabelMeta, Double)] = Seq((LabelMeta.count, 1.0))) { // empty => Count lazy val rankKeysWeightsMap = keySeqAndWeights.toMap - def defaultKey() = { - this.keySeqAndWeights = List((LabelMeta.countSeq, 1.0)) - this - } - def toHashKeyBytes(): Array[Byte] = { var bytes = Array.empty[Byte] - keySeqAndWeights.map { case (key, weight) => - bytes = Bytes.add(bytes, Array.fill(1)(key), Bytes.toBytes(weight)) + keySeqAndWeights.map { case (labelMeta, weight) => + bytes = Bytes.add(bytes, Array.fill(1)(labelMeta.seq), Bytes.toBytes(weight)) } bytes } } -case class S2Request(labelName: String, - direction: String = "out", - ts: Long = System.currentTimeMillis(), - options: Map[String, Any] = Map.empty) { - val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) - val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported.")) - val labelWithDir = LabelWithDirection(label.id.get, dir) - //TODO: need to merge options into queryParam. - val queryParam = QueryParam(labelWithDir, ts) - -} object QueryParam { - lazy val Empty = QueryParam(LabelWithDirection(0, 0)) + lazy val Empty = QueryParam(labelName = "") lazy val DefaultThreshold = Double.MinValue val Delimiter = "," val maxMetaByte = (-1).toByte val fillArray = Array.fill(100)(maxMetaByte) -} -case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System.currentTimeMillis()) { - - import HBaseSerializable._ - import Query.DuplicatePolicy - import Query.DuplicatePolicy._ - - lazy val label = Label.findById(labelWithDir.labelId) - val DefaultKey = LabelIndex.DefaultSeq - val fullKey = DefaultKey - - var labelOrderSeq = fullKey - - var sample = -1 - var limit = 10 - var offset = 0 - var rank = new RankParam(labelWithDir.labelId, List(LabelMeta.countSeq -> 1)) - - var duration: Option[(Long, Long)] = None - var isInverted: Boolean = false - - var columnRangeFilter: ColumnRangeFilter = null - - var hasFilters: Map[Byte, InnerValLike] = Map.empty[Byte, InnerValLike] - var where: Try[Where] = Success(WhereParser.success) - var whereRawOpt: Option[String] = None - var duplicatePolicy = DuplicatePolicy.First - var rpcTimeoutInMillis = 1000 - var maxAttempt = 2 - var includeDegree = false - var tgtVertexInnerIdOpt: Option[InnerValLike] = None - var cacheTTLInMillis: Long = -1L - var threshold = QueryParam.DefaultThreshold - var timeDecay: Option[TimeDecay] = None - var transformer: EdgeTransformer = EdgeTransformer(this, EdgeTransformer.DefaultJson) - var scorePropagateOp: String = "multiply" - var scorePropagateShrinkage: Long = 500 - var exclude = false - var include = false - var shouldNormalize= false - var cursorOpt: Option[String] = None - - var columnRangeFilterMinBytes = Array.empty[Byte] - var columnRangeFilterMaxBytes = Array.empty[Byte] - - lazy val srcColumnWithDir = label.srcColumnWithDir(labelWithDir.dir) - lazy val tgtColumnWithDir = label.tgtColumnWithDir(labelWithDir.dir) - - def toBytes(idxSeq: Byte, offset: Int, limit: Int, isInverted: Boolean): Array[Byte] = { - val front = Array[Byte](idxSeq, if (isInverted) 1.toByte else 0.toByte) - Bytes.add(front, Bytes.toBytes((offset.toLong << 32 | limit))) + def apply(labelWithDirection: LabelWithDirection): QueryParam = { + val label = Label.findById(labelWithDirection.labelId) + val direction = GraphUtil.fromDirection(labelWithDirection.dir) + QueryParam(labelName = label.label, direction = direction) } - - /** - * consider only I/O specific parameters. - * properties that is used on Graph.filterEdges should not be considered. - * @param bytes - * @return - */ - def toCacheKey(bytes: Array[Byte]): Long = { - val hashBytes = toCacheKeyRaw(bytes) - Hashing.murmur3_128().hashBytes(hashBytes).asLong() -// MurmurHash3.bytesHash(hashBytes) - } - - def toCacheKeyRaw(bytes: Array[Byte]): Array[Byte] = { - val transformBytes = transformer.toHashKeyBytes +} +case class QueryParam(labelName: String, + direction: String = "out", + offset: Int = 0, + limit: Int = 100, + sample: Int = -1, + maxAttempt: Int = 2, + rpcTimeout: Int = 1000, + cacheTTLInMillis: Long = -1L, + indexName: String = LabelIndex.DefaultName, + where: Try[Where] = Success(WhereParser.success), + timestamp: Long = System.currentTimeMillis(), + threshold: Double = Double.MinValue, + rank: RankParam = RankParam.Default, + intervalOpt: Option[((Seq[(String, JsValue)]), Seq[(String, JsValue)])] = None, + durationOpt: Option[(Long, Long)] = None, + exclude: Boolean = false, + include: Boolean = false, + has: Map[String, Any] = Map.empty, + duplicatePolicy: DuplicatePolicy = DuplicatePolicy.First, + includeDegree: Boolean = false, + scorePropagateShrinkage: Long = 500L, + scorePropagateOp: String = "multiply", + shouldNormalize: Boolean = false, + whereRawOpt: Option[String] = None, + cursorOpt: Option[String] = None, + tgtVertexIdOpt: Option[Any] = None, + edgeTransformer: EdgeTransformer = EdgeTransformer(EdgeTransformer.DefaultJson), + timeDecay: Option[TimeDecay] = None) { + import JSONParser._ + + //TODO: implement this. + lazy val whereHasParent = true + + lazy val label = Label.findByName(labelName).getOrElse(throw LabelNotExistException(labelName)) + lazy val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"not supported direction: $direction")) + + lazy val labelWithDir = LabelWithDirection(label.id.get, dir) + lazy val labelOrderSeq = + if (indexName == LabelIndex.DefaultName) LabelIndex.DefaultSeq + else label.indexNameMap.getOrElse(indexName, throw new RuntimeException(s"$indexName indexName is not found.")).seq + + lazy val tgtVertexInnerIdOpt = tgtVertexIdOpt.map { id => + val tmp = label.tgtColumnWithDir(dir) + toInnerVal(id, tmp.columnType, tmp.schemaVersion) + } + + def buildInterval(edgeOpt: Option[Edge]) = intervalOpt match { + case None => Array.empty[Byte] -> Array.empty[Byte] + case Some(interval) => + val (froms, tos) = interval + + val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte + val (maxBytes, minBytes) = paddingInterval(len, froms, tos, edgeOpt) + + maxBytes -> minBytes + } + + lazy val isSnapshotEdge = tgtVertexInnerIdOpt.isDefined + + /** since degree info is located on first always */ + lazy val (innerOffset, innerLimit) = if (intervalOpt.isEmpty) { + if (offset == 0) (offset, if (limit == Int.MaxValue) limit else limit + 1) + else (offset + 1, limit) + } else (offset, limit) + + lazy val optionalCacheKey: Array[Byte] = { + val transformBytes = edgeTransformer.toHashKeyBytes //TODO: change this to binrary format. val whereBytes = Bytes.toBytes(whereRawOpt.getOrElse("")) - val durationBytes = duration.map { case (min, max) => + val durationBytes = durationOpt.map { case (min, max) => val minTs = min / cacheTTLInMillis val maxTs = max / cacheTTLInMillis Bytes.add(Bytes.toBytes(minTs), Bytes.toBytes(maxTs)) } getOrElse Array.empty[Byte] -// Bytes.toBytes(duration.toString) + val conditionBytes = Bytes.add(transformBytes, whereBytes, durationBytes) - Bytes.add(Bytes.add(bytes, labelWithDir.bytes, toBytes(labelOrderSeq, offset, limit, isInverted)), rank.toHashKeyBytes(), - Bytes.add(columnRangeFilterMinBytes, columnRangeFilterMaxBytes, conditionBytes)) - } - def isInverted(isInverted: Boolean): QueryParam = { - this.isInverted = isInverted - this + // Interval cache bytes is moved to fetch method + Bytes.add(Bytes.add(toBytes(offset, limit), rank.toHashKeyBytes()), conditionBytes) } - def labelOrderSeq(labelOrderSeq: Byte): QueryParam = { - this.labelOrderSeq = labelOrderSeq - this + def toBytes(offset: Int, limit: Int): Array[Byte] = { + Bytes.add(Bytes.toBytes(offset), Bytes.toBytes(limit)) } - def sample(n: Int): QueryParam = { - this.sample = n - this + def toCacheKey(bytes: Array[Byte]): Long = { + val hashBytes = toCacheKeyRaw(bytes) + Hashing.murmur3_128().hashBytes(hashBytes).asLong() } - def limit(offset: Int, limit: Int): QueryParam = { - /** since degree info is located on first always */ - this.limit = limit - this.offset = offset - - if (this.columnRangeFilter == null) { - if (offset == 0) this.limit = limit + 1 - else this.offset = offset + 1 - } - // this.columnPaginationFilter = new ColumnPaginationFilter(this.limit, this.offset) - this + def toCacheKeyRaw(bytes: Array[Byte]): Array[Byte] = { + Bytes.add(bytes, optionalCacheKey) } - def interval(fromTo: Option[(Seq[(Byte, InnerValLike)], Seq[(Byte, InnerValLike)])]): QueryParam = { - fromTo match { - case Some((from, to)) => interval(from, to) - case _ => this - } - } + private def convertToInner(kvs: Seq[(String, JsValue)], edgeOpt: Option[Edge]): Seq[(LabelMeta, InnerValLike)] = { + kvs.map { case (propKey, propValJs) => + propValJs match { + case JsString(in) if edgeOpt.isDefined && in.contains("_parent.") => + val parentLen = in.split("_parent.").length - 1 + val edge = (0 until parentLen).foldLeft(edgeOpt.get) { case (acc, _) => acc.parentEdges.head.edge } - def paddingInterval(len: Byte, from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]) = { - val fromVal = Bytes.add(propsToBytes(from), QueryParam.fillArray) - val toVal = propsToBytes(to) + val timePivot = edge.ts + val replaced = TemplateHelper.replaceVariable(timePivot, in).trim - toVal(0) = len - fromVal(0) = len + val (_propKey, _padding) = replaced.span(ch => !ch.isDigit && ch != '-' && ch != '+' && ch != ' ') + val propKey = _propKey.split("_parent.").last + val padding = Try(_padding.trim.toLong).getOrElse(0L) - val minMax = (toVal, fromVal) // inverted - minMax - } + val labelMeta = edge.label.metaPropsInvMap.getOrElse(propKey, throw new RuntimeException(s"$propKey not found in ${edge} labelMetas.")) - def interval(from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]): QueryParam = { - val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte - val (minBytes, maxBytes) = paddingInterval(len, from, to) + val propVal = + if (InnerVal.isNumericType(labelMeta.dataType)) { + InnerVal.withLong(edge.props(labelMeta).value.asInstanceOf[BigDecimal].longValue() + padding, label.schemaVersion) + } else { + edge.props(labelMeta) + } - this.columnRangeFilterMaxBytes = maxBytes - this.columnRangeFilterMinBytes = minBytes - this.columnRangeFilter = new ColumnRangeFilter(minBytes, true, maxBytes, true) - this - } + labelMeta -> propVal + case _ => + val labelMeta = label.metaPropsInvMap.getOrElse(propKey, throw new RuntimeException(s"$propKey not found in labelMetas.")) + val propVal = jsValueToInnerVal(propValJs, labelMeta.dataType, label.schemaVersion) - def duration(minMaxTs: Option[(Long, Long)]): QueryParam = { - minMaxTs match { - case Some((minTs, maxTs)) => duration(minTs, maxTs) - case _ => this + labelMeta -> propVal.get + } } } - def duration(minTs: Long, maxTs: Long): QueryParam = { - this.duration = Some((minTs, maxTs)) - this - } - - def rank(r: RankParam): QueryParam = { - this.rank = r - this - } - - def exclude(filterOut: Boolean): QueryParam = { - this.exclude = filterOut - this - } - - def include(filterIn: Boolean): QueryParam = { - this.include = filterIn - this - } - - def has(hasFilters: Map[Byte, InnerValLike]): QueryParam = { - this.hasFilters = hasFilters - this - } - - def where(whereTry: Try[Where]): QueryParam = { - this.where = whereTry - this - } - - def duplicatePolicy(policy: Option[DuplicatePolicy]): QueryParam = { - this.duplicatePolicy = policy.getOrElse(DuplicatePolicy.First) - this - } + def paddingInterval(len: Byte, froms: Seq[(String, JsValue)], tos: Seq[(String, JsValue)], edgeOpt: Option[Edge] = None) = { + val fromInnerVal = convertToInner(froms, edgeOpt) + val toInnerVal = convertToInner(tos, edgeOpt) - def rpcTimeout(millis: Int): QueryParam = { - this.rpcTimeoutInMillis = millis - this - } - - def maxAttempt(attempt: Int): QueryParam = { - this.maxAttempt = attempt - this - } - - def includeDegree(includeDegree: Boolean): QueryParam = { - this.includeDegree = includeDegree - this - } - - def scorePropagateShrinkage(scorePropagateShrinkage: Long): QueryParam = { - this.scorePropagateShrinkage = scorePropagateShrinkage - this - } + val fromVal = Bytes.add(propsToBytes(fromInnerVal), QueryParam.fillArray) + val toVal = propsToBytes(toInnerVal) - def tgtVertexInnerIdOpt(other: Option[InnerValLike]): QueryParam = { - this.tgtVertexInnerIdOpt = other - this - } + toVal(0) = len + fromVal(0) = len - def cacheTTLInMillis(other: Long): QueryParam = { - this.cacheTTLInMillis = other - this + val minMax = (toVal, fromVal) // inverted + minMax } - def timeDecay(other: Option[TimeDecay]): QueryParam = { - this.timeDecay = other - this + def toLabelMetas(names: Seq[String]): Set[LabelMeta] = { + val m = for { + name <- names + labelMeta <- label.metaPropsInvMap.get(name) + } yield labelMeta + m.toSet } +} - def threshold(other: Double): QueryParam = { - this.threshold = other - this - } +object DuplicatePolicy extends Enumeration { + type DuplicatePolicy = Value + val First, Sum, CountSum, Raw = Value - def transformer(other: Option[JsValue]): QueryParam = { - other match { - case Some(js) => this.transformer = EdgeTransformer(this, js) - case None => + def apply(policy: String): Value = { + policy match { + case "sum" => DuplicatePolicy.Sum + case "countSum" => DuplicatePolicy.CountSum + case "raw" => DuplicatePolicy.Raw + case _ => DuplicatePolicy.First } - this - } - - def scorePropagateOp(scorePropagateOp: String): QueryParam = { - this.scorePropagateOp = scorePropagateOp - this - } - - def shouldNormalize(shouldNormalize: Boolean): QueryParam = { - this.shouldNormalize = shouldNormalize - this } - - def whereRawOpt(sqlOpt: Option[String]): QueryParam = { - this.whereRawOpt = sqlOpt - this - } - - def cursorOpt(cursorOpt: Option[String]): QueryParam = { - this.cursorOpt = cursorOpt - this - } - - def isSnapshotEdge = tgtVertexInnerIdOpt.isDefined - - override def toString = { - List(label.label, labelOrderSeq, offset, limit, rank, - duration, isInverted, exclude, include, hasFilters).mkString("\t") - // duration, isInverted, exclude, include, hasFilters, outputFields).mkString("\t") - } - - // - // def buildGetRequest(srcVertex: Vertex) = { - // val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir) - // val (srcInnerId, tgtInnerId) = tgtVertexInnerIdOpt match { - // case Some(tgtVertexInnerId) => // _to is given. - // /** we use toInvertedEdgeHashLike so dont need to swap src, tgt */ - // val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion) - // val tgt = InnerVal.convertVersion(tgtVertexInnerId, tgtColumn.columnType, label.schemaVersion) - // (src, tgt) - // case None => - // val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion) - // (src, src) - // } - // - // val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, srcInnerId), TargetVertexId(tgtColumn.id.get, tgtInnerId)) - // val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId)) - // val edge = Edge(srcV, tgtV, labelWithDir) - // - // val get = if (tgtVertexInnerIdOpt.isDefined) { - // val snapshotEdge = edge.toInvertedEdgeHashLike - // val kv = snapshotEdge.kvs.head - // new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier) - // } else { - // val indexedEdgeOpt = edge.edgesWithIndex.find(e => e.labelIndexSeq == labelOrderSeq) - // assert(indexedEdgeOpt.isDefined) - // val indexedEdge = indexedEdgeOpt.get - // val kv = indexedEdge.kvs.head - // val table = label.hbaseTableName.getBytes - // //kv.table // - // val rowKey = kv.row // indexedEdge.rowKey.bytes - // val cf = edgeCf - // new GetRequest(table, rowKey, cf) - // } - // - // val (minTs, maxTs) = duration.getOrElse((0L, Long.MaxValue)) - // - // get.maxVersions(1) - // get.setFailfast(true) - // get.setMaxResultsPerColumnFamily(limit) - // get.setRowOffsetPerColumnFamily(offset) - // get.setMinTimestamp(minTs) - // get.setMaxTimestamp(maxTs) - // get.setTimeout(rpcTimeoutInMillis) - // if (columnRangeFilter != null) get.setFilter(columnRangeFilter) - // // get.setMaxAttempt(maxAttempt.toByte) - // // get.setRpcTimeout(rpcTimeoutInMillis) - // - // // if (columnRangeFilter != null) get.filter(columnRangeFilter) - // // logger.debug(s"Get: $get, $offset, $limit") - // - // get - // } } - case class TimeDecay(initial: Double = 1.0, lambda: Double = 0.1, timeUnit: Double = 60 * 60 * 24, - labelMetaSeq: Byte = LabelMeta.timeStampSeq) { + labelMeta: LabelMeta = LabelMeta.timestamp) { def decay(diff: Double): Double = { //FIXME val ret = initial * Math.pow(1.0 - lambda, diff / timeUnit)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala index 5b2622f..dd7e45d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala @@ -19,71 +19,106 @@ package org.apache.s2graph.core -import org.apache.s2graph.core.mysqls.LabelMeta -import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs} +import org.apache.s2graph.core.mysqls.{Label, LabelMeta} +import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId} import org.apache.s2graph.core.utils.logger -import scala.collection.mutable.ListBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.collection.{Seq, mutable} object QueryResult { - def fromVertices(query: Query): StepInnerResult = { + def fromVertices(graph: Graph, + query: Query): StepResult = { if (query.steps.isEmpty || query.steps.head.queryParams.isEmpty) { - StepInnerResult.Empty + StepResult.Empty } else { val queryParam = query.steps.head.queryParams.head val label = queryParam.label val currentTs = System.currentTimeMillis() - val propsWithTs = Map(LabelMeta.timeStampSeq -> + val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs)) - val edgeWithScoreLs = for { + val edgeWithScores = for { vertex <- query.vertices } yield { - val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = propsWithTs) - val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore) + val edge = Edge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs) + val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore, queryParam.label) edgeWithScore } - StepInnerResult(edgesWithScoreLs = edgeWithScoreLs, Nil, false) + StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil, false) } } } -/** inner traverse */ -object StepInnerResult { - val Failure = StepInnerResult(Nil, Nil, true) - val Empty = StepInnerResult(Nil, Nil, false) -} -case class StepInnerResult(edgesWithScoreLs: Seq[EdgeWithScore], - degreeEdges: Seq[EdgeWithScore], - isFailure: Boolean = false) { - val isEmpty = edgesWithScoreLs.isEmpty -} case class QueryRequest(query: Query, stepIdx: Int, vertex: Vertex, - queryParam: QueryParam) - + queryParam: QueryParam, + prevStepScore: Double = 1.0, + labelWeight: Double = 1.0) { + val nextStepOpt = + if (stepIdx < query.steps.size - 1) Option(query.steps(stepIdx + 1)) + else None +} -case class QueryResult(edgeWithScoreLs: Seq[EdgeWithScore] = Nil, - tailCursor: Array[Byte] = Array.empty, - timestamp: Long = System.currentTimeMillis(), - isFailure: Boolean = false) +trait WithScore[T] { + def score(me: T): Double + def withNewScore(me: T, newScore: Double): T +} -case class EdgeWithScore(edge: Edge, score: Double) +object WithScore { + implicit val impEdgeWithScore = new WithScore[EdgeWithScore] { + override def score(me: EdgeWithScore): Double = me.score + override def withNewScore(me: EdgeWithScore, newScore: Double): EdgeWithScore = me.copy(score = newScore) + } +} +case class EdgeWithScore(edge: Edge, + score: Double, + label: Label, + orderByValues: (Any, Any, Any, Any) = StepResult.EmptyOrderByValues, + groupByValues: Seq[Option[Any]] = Nil, + stepGroupByValues: Seq[Option[Any]] = Nil, + filterOutValues: Seq[Option[Any]] = Nil, + accumulatedScores: Map[String, Double] = Map.empty) { + + def toValue(keyName: String): Option[Any] = keyName match { + case "from" | "_from" => Option(edge.srcId) + case "to" | "_to" => Option(edge.tgtId) + case "label" => Option(label.label) + case "direction" => Option(edge.dir) + case "score" => Option(score) + case _ => + label.metaPropsInvMap.get(keyName).flatMap { labelMeta => + edge.propsWithTs.get(labelMeta).orElse(label.metaPropsDefaultMapInner.get(labelMeta)).map(_.innerVal.value) + } + } + def toValues(keyNames: Seq[String]): Seq[Option[Any]] = for { + keyName <- keyNames + } yield toValue(keyName) +} /** result */ +case class StepResult(edgeWithScores: Seq[EdgeWithScore], + grouped: Seq[(StepResult.GroupByKey, (Double, StepResult.Values))], + degreeEdges: Seq[EdgeWithScore], + isFailure: Boolean = false, + accumulatedCursors: Seq[Seq[Array[Byte]]] = Nil, + cursors: Seq[Array[Byte]] = Nil, + failCount: Int = 0) { + // val isInnerEmpty = innerResults.isEmpty + val isEmpty = edgeWithScores.isEmpty +} object StepResult { - type Values = Seq[S2EdgeWithScore] + type Values = Seq[EdgeWithScore] type GroupByKey = Seq[Option[Any]] val EmptyOrderByValues = (None, None, None, None) val Empty = StepResult(Nil, Nil, Nil) - + val Failure = StepResult(Nil, Nil, Nil, true, failCount = 1) def mergeOrdered(left: StepResult.Values, right: StepResult.Values, @@ -99,177 +134,182 @@ object StepResult { } } + def filterOutStepGroupBy(edgesWithScores: Seq[EdgeWithScore], + groupBy: GroupBy): Seq[EdgeWithScore] = + if (groupBy == GroupBy.Empty) edgesWithScores + else { + groupBy.minShouldMatch match { + case None => edgesWithScores + case Some(minShouldMatch) => + val MinShouldMatchParam(propKey, count, terms) = minShouldMatch + + val grouped = edgesWithScores.groupBy { edgeWithScore => + edgeWithScore.stepGroupByValues + }.filter { case (key, edges) => + val filtered = edges.toStream.filter{ e => + e.toValue(propKey) match { + case None => false + case Some(v) => terms.contains(v) + } + }.take(count) + + filtered.lengthCompare(count) >= 0 + } + + grouped.values.flatten.toSeq + } + } + def orderBy(queryOption: QueryOption, notOrdered: Values): Values = { import OrderingUtil._ - - if (queryOption.withScore && queryOption.orderByColumns.nonEmpty) { - notOrdered.sortBy(_.orderByValues)(TupleMultiOrdering[Any](queryOption.ascendingVals)) + if (queryOption.withScore) { + val ascendingVals = if (queryOption.ascendingVals.isEmpty) QueryOption.DefaultAscendingVals else queryOption.ascendingVals + notOrdered.sortBy(_.orderByValues)(TupleMultiOrdering[Any](ascendingVals)) } else { notOrdered } } - def toOrderByValues(s2Edge: Edge, - score: Double, - orderByKeys: Seq[String]): (Any, Any, Any, Any) = { - def toValue(propertyKey: String): Any = { - propertyKey match { - case "score" => score - case "timestamp" | "_timestamp" => s2Edge.ts - case _ => s2Edge.properties.get(propertyKey) - } + + def updateScoreOnOrderByValues(scoreFieldIndex: Int, + orderByValues: (Any, Any, Any, Any), + newScore: Double): (Any, Any, Any, Any) = { + scoreFieldIndex match { + case 0 => (newScore, orderByValues._2, orderByValues._3, orderByValues._4) + case 1 => (orderByValues._1, newScore, orderByValues._3, orderByValues._4) + case 2 => (orderByValues._1, orderByValues._2, newScore, orderByValues._4) + case 3 => (orderByValues._1, orderByValues._2, orderByValues._3, newScore) + case _ => orderByValues } - if (orderByKeys.isEmpty) (None, None, None, None) - else { - orderByKeys.length match { - case 1 => - (toValue(orderByKeys(0)), None, None, None) - case 2 => - (toValue(orderByKeys(0)), toValue(orderByKeys(1)), None, None) - case 3 => - (toValue(orderByKeys(0)), toValue(orderByKeys(1)), toValue(orderByKeys(2)), None) - case _ => - (toValue(orderByKeys(0)), toValue(orderByKeys(1)), toValue(orderByKeys(2)), toValue(orderByKeys(3))) - } + + } + + def toTuple4(values: Seq[Option[Any]]): (Any, Any, Any, Any) = { + values.length match { + case 1 => (values(0).getOrElse(None), None, None, None) + case 2 => (values(0).getOrElse(None), values(1).getOrElse(None), None, None) + case 3 => (values(0).getOrElse(None), values(1).getOrElse(None), values(2).getOrElse(None), None) + case _ => (values(0).getOrElse(None), values(1).getOrElse(None), values(2).getOrElse(None), values(3).getOrElse(None)) } } + /** * merge multiple StepResult into one StepResult. - * @param queryOption + * @param globalQueryOption * @param multiStepResults + * @param weights + * @param filterOutStepResult * @return */ - def merges(queryOption: QueryOption, + def merges(globalQueryOption: QueryOption, multiStepResults: Seq[StepResult], - weights: Seq[Double] = Nil): StepResult = { + weights: Seq[Double] = Nil, + filterOutStepResult: StepResult): StepResult = { val degrees = multiStepResults.flatMap(_.degreeEdges) - val ls = new mutable.ListBuffer[S2EdgeWithScore]() - val agg= new mutable.HashMap[GroupByKey, ListBuffer[S2EdgeWithScore]]() + val ls = new mutable.ListBuffer[EdgeWithScore]() + val agg= new mutable.HashMap[GroupByKey, ListBuffer[EdgeWithScore]]() val sums = new mutable.HashMap[GroupByKey, Double]() + + val filterOutSet = filterOutStepResult.edgeWithScores.foldLeft(Set.empty[Seq[Option[Any]]]) { case (prev, t) => + prev + t.filterOutValues + } + for { (weight, eachStepResult) <- weights.zip(multiStepResults) - (ordered, grouped) = (eachStepResult.results, eachStepResult.grouped) + (ordered, grouped) = (eachStepResult.edgeWithScores, eachStepResult.grouped) } { ordered.foreach { t => - val newScore = t.score * weight - ls += t.copy(score = newScore) + val filterOutKey = t.filterOutValues + if (!filterOutSet.contains(filterOutKey)) { + val newScore = t.score * weight + val newT = t.copy(score = newScore) + + // val newOrderByValues = updateScoreOnOrderByValues(globalQueryOption.scoreFieldIdx, t.orderByValues, newScore) + val newOrderByValues = + if (globalQueryOption.orderByKeys.isEmpty) (newScore, t.edge.tsInnerVal, None, None) + else toTuple4(newT.toValues(globalQueryOption.orderByKeys)) + + val newGroupByValues = newT.toValues(globalQueryOption.groupBy.keys) + + ls += t.copy(score = newScore, orderByValues = newOrderByValues, groupByValues = newGroupByValues) + } } // process each query's stepResult's grouped for { (groupByKey, (scoreSum, values)) <- grouped } { - val buffer = agg.getOrElseUpdate(groupByKey, ListBuffer.empty[S2EdgeWithScore]) + val buffer = agg.getOrElseUpdate(groupByKey, ListBuffer.empty[EdgeWithScore]) var scoreSum = 0.0 + var isEmpty = true values.foreach { t => - val newScore = t.score * weight - buffer += t.copy(score = newScore) - scoreSum += newScore + val filterOutKey = t.filterOutValues + if (!filterOutSet.contains(filterOutKey)) { + isEmpty = false + val newScore = t.score * weight + val newT = t.copy(score = newScore) +// val newOrderByValues = updateScoreOnOrderByValues(globalQueryOption.scoreFieldIdx, t.orderByValues, newScore) + + val newOrderByValues = + if (globalQueryOption.orderByKeys.isEmpty) (newScore, t.edge.tsInnerVal, None, None) + else toTuple4(newT.toValues(globalQueryOption.orderByKeys)) + + val newGroupByValues = newT.toValues(globalQueryOption.groupBy.keys) + + buffer += t.copy(score = newScore, orderByValues = newOrderByValues, groupByValues = newGroupByValues) + scoreSum += newScore + } } - sums += (groupByKey -> scoreSum) + if (!isEmpty) sums += (groupByKey -> scoreSum) } } // process global groupBy - if (queryOption.groupBy.keys.nonEmpty) { + val (ordered, grouped) = if (globalQueryOption.groupBy.keys.nonEmpty) { for { - s2EdgeWithScore <- ls - groupByKey = s2EdgeWithScore.s2Edge.selectValues(queryOption.groupBy.keys) + edgeWithScore <- ls + groupByKey = edgeWithScore.groupByValues } { - val buffer = agg.getOrElseUpdate(groupByKey, ListBuffer.empty[S2EdgeWithScore]) - buffer += s2EdgeWithScore - val newScore = sums.getOrElse(groupByKey, 0.0) + s2EdgeWithScore.score + val buffer = agg.getOrElseUpdate(groupByKey, ListBuffer.empty[EdgeWithScore]) + buffer += edgeWithScore + val newScore = sums.getOrElse(groupByKey, 0.0) + edgeWithScore.score sums += (groupByKey -> newScore) } + val grouped = for { + (groupByKey, scoreSum) <- sums.toSeq.sortBy(_._2 * -1) + aggregated = agg(groupByKey) if aggregated.nonEmpty + sorted = orderBy(globalQueryOption, aggregated) + } yield groupByKey -> (scoreSum, sorted) + (Nil, grouped) + } else { + val ordered = orderBy(globalQueryOption, ls) + (ordered, Nil) } - - val ordered = orderBy(queryOption, ls) - val grouped = for { - (groupByKey, scoreSum) <- sums.toSeq.sortBy(_._2 * -1) - aggregated = agg(groupByKey) if aggregated.nonEmpty - } yield groupByKey -> (scoreSum, aggregated) - - StepResult(results = ordered, grouped = grouped, degrees) + StepResult(edgeWithScores = ordered, grouped = grouped, degrees, failCount = multiStepResults.map(_.failCount).sum) } //TODO: Optimize this. def filterOut(graph: Graph, queryOption: QueryOption, baseStepResult: StepResult, - filterOutStepInnerResult: StepInnerResult): StepResult = { + filterOutStepResult: StepResult): StepResult = { - val fields = if (queryOption.filterOutFields.isEmpty) Seq("to") else Seq("to") - // else queryOption.filterOutFields - val filterOutSet = filterOutStepInnerResult.edgesWithScoreLs.map { t => - t.edge.selectValues(fields) - }.toSet + val filterOutSet = filterOutStepResult.edgeWithScores.foldLeft(Set.empty[Seq[Option[Any]]]) { case (prev, t) => + prev + t.filterOutValues + } - val filteredResults = baseStepResult.results.filter { t => - val filterOutKey = t.s2Edge.selectValues(fields) + val filteredResults = baseStepResult.edgeWithScores.filter { t => + val filterOutKey = t.filterOutValues !filterOutSet.contains(filterOutKey) } val grouped = for { (key, (scoreSum, values)) <- baseStepResult.grouped - (out, in) = values.partition(v => filterOutSet.contains(v.s2Edge.selectValues(fields))) + (out, in) = values.partition(v => filterOutSet.contains(v.filterOutValues)) newScoreSum = scoreSum - out.foldLeft(0.0) { case (prev, current) => prev + current.score } if in.nonEmpty } yield key -> (newScoreSum, in) - - StepResult(results = filteredResults, grouped = grouped, baseStepResult.degreeEdges) - } - def apply(graph: Graph, - queryOption: QueryOption, - stepInnerResult: StepInnerResult): StepResult = { - logger.debug(s"[BeforePostProcess]: ${stepInnerResult.edgesWithScoreLs.size}") - - val results = for { - edgeWithScore <- stepInnerResult.edgesWithScoreLs - } yield { - val edge = edgeWithScore.edge - val orderByValues = - if (queryOption.orderByColumns.isEmpty) (edgeWithScore.score, None, None, None) - else toOrderByValues(edge, edgeWithScore.score, queryOption.orderByKeys) - - S2EdgeWithScore(edge, edgeWithScore.score, orderByValues, edgeWithScore.edge.parentEdges) - } - /** ordered flatten result */ - val ordered = orderBy(queryOption, results) - - /** ordered grouped result */ - val grouped = - if (queryOption.groupBy.keys.isEmpty) Nil - else { - val agg = new mutable.HashMap[GroupByKey, (Double, Values)]() - results.groupBy { s2EdgeWithScore => - s2EdgeWithScore.s2Edge.selectValues(queryOption.groupBy.keys, useToString = true) - }.map { case (k, ls) => - val (scoreSum, merged) = mergeOrdered(ls, Nil, queryOption) - /** - * watch out here. by calling toString on Any, we lose type information which will be used - * later for toJson. - */ - if (merged.nonEmpty) { - val newKey = merged.head.s2Edge.selectValues(queryOption.groupBy.keys, useToString = false) - agg += (newKey -> (scoreSum, merged)) - } - } - agg.toSeq.sortBy(_._2._1 * -1) - } - - val degrees = stepInnerResult.degreeEdges.map(t => S2EdgeWithScore(t.edge, t.score)) - StepResult(results = ordered, grouped = grouped, degreeEdges = degrees) + StepResult(edgeWithScores = filteredResults, grouped = grouped, baseStepResult.degreeEdges, cursors = baseStepResult.cursors, failCount = baseStepResult.failCount + filterOutStepResult.failCount) } } - -case class S2EdgeWithScore(s2Edge: Edge, - score: Double, - orderByValues: (Any, Any, Any, Any) = StepResult.EmptyOrderByValues, - parentEdges: Seq[EdgeWithScore] = Nil) - -case class StepResult(results: StepResult.Values, - grouped: Seq[(StepResult.GroupByKey, (Double, StepResult.Values))], - degreeEdges: StepResult.Values) { - val isEmpty = results.isEmpty -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala index 0377bd8..bbd71ec 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala @@ -23,49 +23,6 @@ import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn} import org.apache.s2graph.core.types.{InnerVal, InnerValLike, SourceVertexId, VertexId} import play.api.libs.json.Json -// -//object S2Vertex { -// def apply(graph: Graph, vertex: Vertex): S2Vertex = { -// S2Vertex(graph, -// vertex.serviceName, -// vertex.serviceColumn.columnName, -// vertex.innerIdVal, -// vertex.serviceColumn.innerValsToProps(vertex.props), -// vertex.ts, -// GraphUtil.fromOp(vertex.op) -// ) -// } -//} -// -//case class S2Vertex(graph: Graph, -// serviceName: String, -// columnName: String, -// id: Any, -// props: Map[String, Any] = Map.empty, -// ts: Long = System.currentTimeMillis(), -// operation: String = "insert") extends GraphElement { -// lazy val vertex = { -// val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found.")) -// val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found.")) -// val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) -// -// val srcVertexId = VertexId(column.id.get, toInnerVal(id.toString, column.columnType, column.schemaVersion)) -// val propsInner = column.propsToInnerVals(props) ++ -// Map(ColumnMeta.timeStampSeq.toInt -> InnerVal.withLong(ts, column.schemaVersion)) -// -// Vertex(srcVertexId, ts, propsInner, op) -// } -// -// val uniqueId = (serviceName, columnName, id) -// -// override def isAsync: Boolean = vertex.isAsync -// -// override def toLogString(): String = vertex.toLogString() -// -// override def queueKey: String = vertex.queueKey -// -// override def queuePartitionKey: String = vertex.queuePartitionKey -//} case class Vertex(id: VertexId, ts: Long = System.currentTimeMillis(), props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike], @@ -109,22 +66,6 @@ case class Vertex(id: VertexId, meta <- ColumnMeta.findByIdAndSeq(id.colId, seq.toByte) } yield (meta.name -> v.toString) - // /** only used by bulk loader */ - // def buildPuts(): List[Put] = { - // // logger.error(s"put: $this => $rowKey") - //// val put = new Put(rowKey.bytes) - //// for ((q, v) <- qualifiersWithValues) { - //// put.addColumn(vertexCf, q, ts, v) - //// } - //// List(put) - // val kv = kvs.head - // val put = new Put(kv.row) - // kvs.map { kv => - // put.addColumn(kv.cf, kv.qualifier, kv.timestamp, kv.value) - // } - // List(put) - // } - def toEdgeVertex() = Vertex(SourceVertexId(id.colId, innerId), ts, props, op) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala index bcd5f0a..43e5db8 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala @@ -19,12 +19,10 @@ package org.apache.s2graph.core.mysqls -/** - * Created by shon on 8/5/15. - */ - import scalikejdbc._ +import scala.util.Try + object Bucket extends Model[Bucket] { val rangeDelimiter = "~" @@ -53,7 +51,7 @@ object Bucket extends Model[Bucket] { def toRange(str: String): Option[(Int, Int)] = { val range = str.split(rangeDelimiter) - if (range.length == 2) Option((range.head.toInt, range.last.toInt)) + if (range.length == 2) Option(range.head.toInt, range.last.toInt) else None } @@ -69,6 +67,24 @@ object Bucket extends Model[Bucket] { sql.single().apply() } } + + def insert(experiment: Experiment, modular: String, httpVerb: String, apiPath: String, + requestBody: String, timeout: Int, impressionId: String, + isGraphQuery: Boolean, isEmpty: Boolean) + (implicit session: DBSession = AutoSession): Try[Bucket] = { + Try { + sql""" + INSERT INTO buckets(experiment_id, modular, http_verb, api_path, request_body, timeout, impression_id, + is_graph_query, is_empty) + VALUES (${experiment.id.get}, $modular, $httpVerb, $apiPath, $requestBody, $timeout, $impressionId, + $isGraphQuery, $isEmpty) + """ + .updateAndReturnGeneratedKey().apply() + }.map { newId => + Bucket(Some(newId.toInt), experiment.id.get, modular, httpVerb, apiPath, requestBody, timeout, impressionId, + isGraphQuery, isEmpty) + } + } } case class Bucket(id: Option[Int], http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala index cd825f4..0b16449 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala @@ -22,7 +22,7 @@ package org.apache.s2graph.core.mysqls import org.apache.s2graph.core.GraphUtil import scalikejdbc._ -import scala.util.Random +import scala.util.{Try, Random} object Experiment extends Model[Experiment] { val ImpressionKey = "S2-Impression-Id" @@ -60,6 +60,17 @@ object Experiment extends Model[Experiment] { .map { rs => Experiment(rs) }.single.apply ) } + + def insert(service: Service, name: String, description: String, experimentType: String = "t", totalModular: Int = 100) + (implicit session: DBSession = AutoSession): Try[Experiment] = { + Try { + sql"""INSERT INTO experiments(service_id, service_name, `name`, description, experiment_type, total_modular) + VALUES(${service.id.get}, ${service.serviceName}, $name, $description, $experimentType, $totalModular)""" + .updateAndReturnGeneratedKey().apply() + }.map { newId => + Experiment(Some(newId.toInt), service.id.get, name, description, experimentType, totalModular) + } + } } case class Experiment(id: Option[Int], http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala index f7318f6..fdef677 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala @@ -21,13 +21,14 @@ package org.apache.s2graph.core.mysqls import java.util.Calendar +import com.typesafe.config.Config import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException import org.apache.s2graph.core.GraphUtil import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} -import org.apache.s2graph.core.types.{InnerValLike, InnerValLikeWithTs} +import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs} import org.apache.s2graph.core.utils.logger -import play.api.libs.json.{JsObject, JsValue, Json} +import play.api.libs.json.{JsArray, JsObject, JsValue, Json} import scalikejdbc._ object Label extends Model[Label] { @@ -57,7 +58,8 @@ object Label extends Model[Label] { sql""" select * from labels - where label = ${labelName}""".map { rs => Label(rs) }.single.apply() + where label = ${labelName} + and deleted_at is null """.map { rs => Label(rs) }.single.apply() if (useCache) withCache(cacheKey)(labelOpt) else labelOpt @@ -101,7 +103,8 @@ object Label extends Model[Label] { sql""" select * from labels - where id = ${id}""" + where id = ${id} + and deleted_at is null""" .map { rs => Label(rs) }.single.apply()) } @@ -111,7 +114,8 @@ object Label extends Model[Label] { sql""" select * from labels - where id = ${id}""" + where id = ${id} + and deleted_at is null""" .map { rs => Label(rs) }.single.apply()).get } @@ -124,6 +128,7 @@ object Label extends Model[Label] { from labels where tgt_column_name = ${col.columnName} and service_id = ${col.serviceId} + and deleted_at is null """.map { rs => Label(rs) }.list().apply()) } @@ -136,20 +141,21 @@ object Label extends Model[Label] { from labels where src_column_name = ${col.columnName} and service_id = ${col.serviceId} + and deleted_at is null """.map { rs => Label(rs) }.list().apply()) } def findBySrcServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = { val cacheKey = "srcServiceId=" + serviceId withCaches(cacheKey)( - sql"""select * from labels where src_service_id = ${serviceId}""".map { rs => Label(rs) }.list().apply + sql"""select * from labels where src_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply ) } def findByTgtServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = { val cacheKey = "tgtServiceId=" + serviceId withCaches(cacheKey)( - sql"""select * from labels where tgt_service_id = ${serviceId}""".map { rs => Label(rs) }.list().apply + sql"""select * from labels where tgt_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply ) } @@ -183,14 +189,14 @@ object Label extends Model[Label] { val tgtServiceId = tgtService.id.get val serviceId = service.id.get - /* insert serviceColumn */ + /** insert serviceColumn */ val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType), schemaVersion) val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType), schemaVersion) if (srcCol.columnType != srcColumnType) throw new RuntimeException(s"source service column type not matched ${srcCol.columnType} != ${srcColumnType}") if (tgtCol.columnType != tgtColumnType) throw new RuntimeException(s"target service column type not matched ${tgtCol.columnType} != ${tgtColumnType}") - /* create label */ + /** create label */ Label.findByName(labelName, useCache = false).getOrElse { val createdId = insert(labelName, srcServiceId, srcColumnName, srcColumnType, @@ -224,7 +230,7 @@ object Label extends Model[Label] { } def findAll()(implicit session: DBSession = AutoSession) = { - val ls = sql"""select * from labels""".map { rs => Label(rs) }.list().apply() + val ls = sql"""select * from labels where deleted_at is null""".map { rs => Label(rs) }.list().apply() putsToCache(ls.map { x => val cacheKey = s"id=${x.id.get}" (cacheKey -> x) @@ -233,6 +239,7 @@ object Label extends Model[Label] { val cacheKey = s"label=${x.label}" (cacheKey -> x) }) + ls } def updateName(oldName: String, newName: String)(implicit session: DBSession = AutoSession) = { @@ -264,6 +271,21 @@ object Label extends Model[Label] { } cnt } + + def markDeleted(label: Label)(implicit session: DBSession = AutoSession) = { + + logger.info(s"mark deleted label: $label") + val oldName = label.label + val now = Calendar.getInstance().getTime + val newName = s"deleted_${now.getTime}_"+ label.label + val cnt = sql"""update labels set label = ${newName}, deleted_at = ${now} where id = ${label.id.get}""".update.apply() + val cacheKeys = List(s"id=${label.id}", s"label=${oldName}") + cacheKeys.foreach { key => + expireCache(key) + expireCaches(key) + } + cnt + } } case class Label(id: Option[Int], label: String, @@ -274,10 +296,9 @@ case class Label(id: Option[Int], label: String, schemaVersion: String, isAsync: Boolean = false, compressionAlgorithm: String, options: Option[String]) { + def metas(useCache: Boolean = true) = LabelMeta.findAllByLabelId(id.get, useCache = useCache) - def metas = LabelMeta.findAllByLabelId(id.get) - - def metaSeqsToNames = metas.map(x => (x.seq, x.name)) toMap + def indices(useCache: Boolean = true) = LabelIndex.findByLabelIdAll(id.get, useCache = useCache) // lazy val firstHBaseTableName = hbaseTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME) lazy val srcService = Service.findById(srcServiceId) @@ -356,83 +377,131 @@ case class Label(id: Option[Int], label: String, // } // }""")) - lazy val extraOptions: Map[String, JsValue] = options match { - case None => Map.empty - case Some(v) => - try { - Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty) - } catch { - case e: Exception => - logger.error(s"An error occurs while parsing the extra label option: ${label}", e) - Map.empty - } + lazy val tokens: Set[String] = extraOptions.get("tokens").fold(Set.empty[String]) { + case JsArray(tokens) => tokens.map(_.as[String]).toSet + case _ => + logger.error("Invalid token JSON") + Set.empty[String] + } + + lazy val extraOptions = Model.extraOptions(options) + + lazy val durability = extraOptions.get("durability").map(_.as[Boolean]).getOrElse(true) + + lazy val storageConfigOpt: Option[Config] = toStorageConfig + + def toStorageConfig: Option[Config] = { + Model.toStorageConfig(extraOptions) } + def srcColumnWithDir(dir: Int) = { - if (dir == GraphUtil.directions("out")) srcColumn else tgtColumn + // GraphUtil.directions("out" + if (dir == 0) srcColumn else tgtColumn } def tgtColumnWithDir(dir: Int) = { - if (dir == GraphUtil.directions("out")) tgtColumn else srcColumn + // GraphUtil.directions("out" + if (dir == 0) tgtColumn else srcColumn } - def srcTgtColumn(dir: Int) = - if (isDirected) { - (srcColumnWithDir(dir), tgtColumnWithDir(dir)) - } else { - if (dir == GraphUtil.directions("in")) { - (tgtColumn, srcColumn) - } else { - (srcColumn, tgtColumn) - } - } - - def init() = { - metas - metaSeqsToNames - service - srcColumn - tgtColumn - defaultIndex - indices - metaProps - } + lazy val tgtSrc = (tgtColumn, srcColumn) + lazy val srcTgt = (srcColumn, tgtColumn) + + def srcTgtColumn(dir: Int) = if (dir == 1) tgtSrc else srcTgt + + lazy val EmptyPropsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 0)) +// def init() = { +// metas() +// metaSeqsToNames() +// service +// srcColumn +// tgtColumn +// defaultIndex +// indices +// metaProps +// } + + // def srcColumnInnerVal(jsValue: JsValue) = { + // jsValueToInnerVal(jsValue, srcColumnType, version) + // } + // def tgtColumnInnerVal(jsValue: JsValue) = { + // jsValueToInnerVal(jsValue, tgtColumnType, version) + // } override def toString(): String = { val orderByKeys = LabelMeta.findAllByLabelId(id.get) super.toString() + orderByKeys.toString() } - lazy val toJson = Json.obj("labelName" -> label, - "from" -> srcColumn.toJson, "to" -> tgtColumn.toJson, - "isDirected" -> isDirected, - "serviceName" -> serviceName, - "consistencyLevel" -> consistencyLevel, - "schemaVersion" -> schemaVersion, - "isAsync" -> isAsync, - "compressionAlgorithm" -> compressionAlgorithm, - "defaultIndex" -> defaultIndex.map(x => x.toJson), - "extraIndex" -> extraIndices.map(exIdx => exIdx.toJson), - "metaProps" -> metaProps.filter { labelMeta => LabelMeta.isValidSeqForAdmin(labelMeta.seq) }.map(_.toJson) - ) - + // def findLabelIndexSeq(scoring: List[(Byte, Double)]): Byte = { + // if (scoring.isEmpty) LabelIndex.defaultSeq + // else { + // LabelIndex.findByLabelIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq) + // + //// LabelIndex.findByLabelIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq) + // } + // } + lazy val toJson = { + val allIdxs = LabelIndex.findByLabelIdAll(id.get, useCache = false) + val defaultIdxOpt = LabelIndex.findByLabelIdAndSeq(id.get, LabelIndex.DefaultSeq, useCache = false) + val extraIdxs = allIdxs.filter(idx => defaultIdxOpt.isDefined && idx.id.get != defaultIdxOpt.get.id.get) + val metaProps = LabelMeta.reservedMetas.map { m => + if (m == LabelMeta.to) m.copy(dataType = tgtColumnType) + else if (m == LabelMeta.from) m.copy(dataType = srcColumnType) + else m + } ::: LabelMeta.findAllByLabelId(id.get, useCache = false) + + val defaultIdx = defaultIdxOpt.map(x => x.toJson).getOrElse(Json.obj()) + val optionsJs = try { + val obj = options.map(Json.parse).getOrElse(Json.obj()).as[JsObject] + if (!obj.value.contains("tokens")) obj + else obj ++ Json.obj("tokens" -> obj.value("tokens").as[Seq[String]].map("*" * _.length)) + + } catch { case e: Exception => Json.obj() } + + Json.obj("labelName" -> label, + "from" -> srcColumn.toJson, "to" -> tgtColumn.toJson, + "isDirected" -> isDirected, + "serviceName" -> serviceName, + "consistencyLevel" -> consistencyLevel, + "schemaVersion" -> schemaVersion, + "isAsync" -> isAsync, + "compressionAlgorithm" -> compressionAlgorithm, + "defaultIndex" -> defaultIdx, + "extraIndex" -> extraIdxs.map(exIdx => exIdx.toJson), + "metaProps" -> metaProps.filter { labelMeta => LabelMeta.isValidSeqForAdmin(labelMeta.seq) }.map(_.toJson), + "options" -> optionsJs + ) + } def propsToInnerValsWithTs(props: Map[String, Any], - ts: Long = System.currentTimeMillis()): Map[Byte, InnerValLikeWithTs] = { + ts: Long = System.currentTimeMillis()): Map[LabelMeta, InnerValLikeWithTs] = { for { (k, v) <- props labelMeta <- metaPropsInvMap.get(k) innerVal = toInnerVal(v.toString, labelMeta.dataType, schemaVersion) - } yield labelMeta.seq -> InnerValLikeWithTs(innerVal, ts) + } yield labelMeta -> InnerValLikeWithTs(innerVal, ts) } - def innerValsWithTsToProps(props: Map[Byte, InnerValLikeWithTs]): Map[String, Any] = { - for { - (k, v) <- props - labelMeta <- metaPropsMap.get(k) - } yield { - labelMeta.name -> v.innerVal.value + def innerValsWithTsToProps(props: Map[LabelMeta, InnerValLikeWithTs], + selectColumns: Map[Byte, Boolean]): Map[String, Any] = { + if (selectColumns.isEmpty) { + for { + (meta, v) <- metaPropsDefaultMapInner ++ props + } yield { + meta.name -> innerValToAny(v.innerVal, meta.dataType) + } + } else { + for { + (k, _) <- selectColumns + if k != LabelMeta.toSeq && k != LabelMeta.fromSeq + labelMeta <- metaPropsMap.get(k) + } yield { + val v = props.get(labelMeta).orElse(metaPropsDefaultMapInner.get(labelMeta)).get + labelMeta.name -> innerValToAny(v.innerVal, labelMeta.dataType) + } } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala index d7736bc..c548868 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala @@ -23,12 +23,14 @@ package org.apache.s2graph.core.mysqls * Created by shon on 6/3/15. */ -import play.api.libs.json.Json +import org.apache.s2graph.core.GraphUtil +import org.apache.s2graph.core.utils.logger +import play.api.libs.json.{JsObject, JsString, Json} import scalikejdbc._ object LabelIndex extends Model[LabelIndex] { val DefaultName = "_PK" - val DefaultMetaSeqs = Seq(LabelMeta.timeStampSeq) + val DefaultMetaSeqs = Seq(LabelMeta.timestampSeq) val DefaultSeq = 1.toByte val MaxOrderSeq = 7 @@ -144,16 +146,11 @@ object LabelIndex extends Model[LabelIndex] { } } -/** - * formular - * ex1): w1, w2, w3 - * ex2): 1.5 * w1^2 + 3.4 * (w1 * w2), w2, w1 - */ - case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, metaSeqs: Seq[Byte], formulars: String) { lazy val label = Label.findById(labelId) lazy val metas = label.metaPropsMap lazy val sortKeyTypes = metaSeqs.flatMap(metaSeq => label.metaPropsMap.get(metaSeq)) + lazy val sortKeyTypesArray = sortKeyTypes.toArray lazy val propNames = sortKeyTypes.map { labelMeta => labelMeta.name } lazy val toJson = Json.obj( "name" -> name, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala index ef5d90a..4a7e931 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala @@ -32,40 +32,61 @@ object LabelMeta extends Model[LabelMeta] { /** dummy sequences */ - val fromSeq = -4.toByte - val toSeq = -5.toByte - val lastOpSeq = -3.toByte - val lastDeletedAt = -2.toByte - val timeStampSeq = 0.toByte + val fromSeq = (-4).toByte + val toSeq = (-5).toByte + val lastOpSeq = (-3).toByte + val lastDeletedAtSeq = (-2).toByte + val timestampSeq = (0).toByte + val labelSeq = (-6).toByte + val directionSeq = -7.toByte + val fromHashSeq = -8.toByte + val countSeq = (Byte.MaxValue - 2).toByte val degreeSeq = (Byte.MaxValue - 1).toByte val maxValue = Byte.MaxValue - val emptyValue = Byte.MaxValue + val emptySeq = Byte.MaxValue /** reserved sequences */ // val deleted = LabelMeta(id = Some(lastDeletedAt), labelId = lastDeletedAt, name = "lastDeletedAt", // seq = lastDeletedAt, defaultValue = "", dataType = "long") + val fromHash = LabelMeta(id = None, labelId = fromHashSeq, name = "_from_hash", + seq = fromHashSeq, defaultValue = fromHashSeq.toString, dataType = "long") val from = LabelMeta(id = Some(fromSeq), labelId = fromSeq, name = "_from", - seq = fromSeq, defaultValue = fromSeq.toString, dataType = "long") + seq = fromSeq, defaultValue = fromSeq.toString, dataType = "string") val to = LabelMeta(id = Some(toSeq), labelId = toSeq, name = "_to", - seq = toSeq, defaultValue = toSeq.toString, dataType = "long") + seq = toSeq, defaultValue = toSeq.toString, dataType = "string") val timestamp = LabelMeta(id = Some(-1), labelId = -1, name = "_timestamp", - seq = timeStampSeq, defaultValue = "0", dataType = "long") + seq = timestampSeq, defaultValue = "0", dataType = "long") val degree = LabelMeta(id = Some(-1), labelId = -1, name = "_degree", seq = degreeSeq, defaultValue = "0", dataType = "long") val count = LabelMeta(id = Some(-1), labelId = -1, name = "_count", seq = countSeq, defaultValue = "-1", dataType = "long") + val lastDeletedAt = LabelMeta(id = Some(-1), labelId = -1, name = "_lastDeletedAt", + seq = lastDeletedAtSeq, defaultValue = "-1", dataType = "long") + val label = LabelMeta(id = Some(-1), labelId = -1, name = "label", + seq = labelSeq, defaultValue = "", dataType = "string") + val direction = LabelMeta(id = Some(-1), labelId = -1, name = "direction", + seq = directionSeq, defaultValue = "out", dataType = "string") + val empty = LabelMeta(id = Some(-1), labelId = -1, name = "_empty", + seq = emptySeq, defaultValue = "-1", dataType = "long") // Each reserved column(_timestamp, timestamp) has same seq number, starts with '_' has high priority - val reservedMetas = List(from, to, degree, timestamp, count).flatMap { lm => List(lm, lm.copy(name = lm.name.drop(1))) }.reverse - val reservedMetasInner = List(from, to, degree, timestamp, count) + val reservedMetas = List(empty, label, direction, lastDeletedAt, from, fromHash, to, degree, timestamp, count).flatMap { lm => List(lm, lm.copy(name = lm.name.drop(1))) }.reverse + val reservedMetasInner = List(empty, label, direction, lastDeletedAt, from, fromHash, to, degree, timestamp, count) + + val defaultRequiredMetaNames = Set("from", "_from", "to", "_to", "_from_hash", "label", "direction", "timestamp", "_timestamp") def apply(rs: WrappedResultSet): LabelMeta = { LabelMeta(Some(rs.int("id")), rs.int("label_id"), rs.string("name"), rs.byte("seq"), rs.string("default_value"), rs.string("data_type").toLowerCase) } - def isValidSeq(seq: Byte): Boolean = seq >= 0 && seq <= countSeq - def isValidSeqForAdmin(seq: Byte): Boolean = seq > 0 && seq < countSeq + /** Note: DegreeSeq should not be included in serializer/deserializer. + * only 0 <= seq <= CountSeq(Int.MaxValue - 2), not DegreeSet(Int.MaxValue - 1) should be + * included in actual bytes in storage. + * */ + def isValidSeq(seq: Byte): Boolean = seq >= 0 && seq <= countSeq // || seq == fromHashSeq + + def isValidSeqForAdmin(seq: Byte): Boolean = seq > 0 && seq < countSeq // || seq == fromHashSeq def findById(id: Int)(implicit session: DBSession = AutoSession): LabelMeta = { val cacheKey = "id=" + id @@ -164,6 +185,21 @@ object LabelMeta extends Model[LabelMeta] { } } -case class LabelMeta(id: Option[Int], labelId: Int, name: String, seq: Byte, defaultValue: String, dataType: String) { +case class LabelMeta(id: Option[Int], + labelId: Int, + name: String, + seq: Byte, + defaultValue: String, + dataType: String) { lazy val toJson = Json.obj("name" -> name, "defaultValue" -> defaultValue, "dataType" -> dataType) + override def equals(other: Any): Boolean = { + if (!other.isInstanceOf[LabelMeta]) false + else { + val o = other.asInstanceOf[LabelMeta] +// labelId == o.labelId && + seq == o.seq + } + } + override def hashCode(): Int = seq.toInt +// (labelId, seq).hashCode() } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala index bf6dfb7..7a18a49 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala @@ -21,8 +21,10 @@ package org.apache.s2graph.core.mysqls import java.util.concurrent.Executors -import com.typesafe.config.Config +import com.typesafe.config.{ConfigFactory, Config} +import org.apache.s2graph.core.JSONParser import org.apache.s2graph.core.utils.{SafeUpdateCache, logger} +import play.api.libs.json.{Json, JsObject, JsValue} import scalikejdbc._ import scala.concurrent.ExecutionContext @@ -36,6 +38,7 @@ object Model { val numOfThread = Runtime.getRuntime.availableProcessors() val threadPool = Executors.newFixedThreadPool(numOfThread) val ec = ExecutionContext.fromExecutor(threadPool) + val useUTF8Encoding = "?useUnicode=true&characterEncoding=utf8" def apply(config: Config) = { maxSize = config.getInt("cache.max.size") @@ -116,6 +119,34 @@ object Model { LabelIndex.findAll() ColumnMeta.findAll() } + + def extraOptions(options: Option[String]): Map[String, JsValue] = options match { + case None => Map.empty + case Some(v) => + try { + Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty) + } catch { + case e: Exception => + logger.error(s"An error occurs while parsing the extra label option", e) + Map.empty + } + } + + def toStorageConfig(options: Map[String, JsValue]): Option[Config] = { + try { + options.get("storage").map { jsValue => + import scala.collection.JavaConverters._ + val configMap = jsValue.as[JsObject].fieldSet.toMap.map { case (key, value) => + key -> JSONParser.jsValueToAny(value).getOrElse(throw new RuntimeException("!!")) + } + ConfigFactory.parseMap(configMap.asJava) + } + } catch { + case e: Exception => + logger.error(s"toStorageConfig error. use default storage", e) + None + } + } } trait Model[V] extends SQLSyntaxSupport[V] { @@ -145,5 +176,9 @@ trait Model[V] extends SQLSyntaxSupport[V] { def putsToCaches(kvs: List[(String, List[V])]) = kvs.foreach { case (key, values) => listCache.put(key, values) } + + def getAllCacheData() : (List[(String, Option[_])], List[(String, List[_])]) = { + (optionCache.getAllData(), listCache.getAllData()) + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala index 3e01014..7fdda45 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala @@ -21,6 +21,7 @@ package org.apache.s2graph.core.mysqls import java.util.UUID +import com.typesafe.config.Config import org.apache.s2graph.core.utils.logger import play.api.libs.json.Json import scalikejdbc._ @@ -94,6 +95,7 @@ object Service extends Model[Service] { val cacheKey = s"serviceName=${x.serviceName}" (cacheKey -> x) }) + ls } def findAllConn()(implicit session: DBSession = AutoSession): List[String] = { @@ -101,7 +103,14 @@ object Service extends Model[Service] { } } -case class Service(id: Option[Int], serviceName: String, accessToken: String, cluster: String, hTableName: String, preSplitSize: Int, hTableTTL: Option[Int]) { +case class Service(id: Option[Int], + serviceName: String, + accessToken: String, + cluster: String, + hTableName: String, + preSplitSize: Int, + hTableTTL: Option[Int], + options: Option[String] = None) { lazy val toJson = id match { case Some(_id) => @@ -110,4 +119,8 @@ case class Service(id: Option[Int], serviceName: String, accessToken: String, cl case None => Json.parse("{}") } + + lazy val extraOptions = Model.extraOptions(options) + lazy val storageConfigOpt: Option[Config] = toStorageConfig + def toStorageConfig: Option[Config] = Model.toStorageConfig(extraOptions) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala index 1c93667..effb94b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala @@ -19,11 +19,13 @@ package org.apache.s2graph.core.parsers -import org.apache.s2graph.core.GraphExceptions.WhereParserException +import org.apache.s2graph.core.GraphExceptions.{LabelNotExistException, WhereParserException} import org.apache.s2graph.core.mysqls.{Label, LabelMeta} import org.apache.s2graph.core.types.InnerValLike -import org.apache.s2graph.core.Edge +import org.apache.s2graph.core.{Edge, GraphUtil} import org.apache.s2graph.core.JSONParser._ +import org.apache.s2graph.core.utils.logger + import scala.annotation.tailrec import scala.util.Try import scala.util.parsing.combinator.JavaTokenParsers @@ -37,12 +39,11 @@ trait ExtractValue { val label = parentEdge.label val metaPropInvMap = label.metaPropsInvMap val labelMeta = metaPropInvMap.getOrElse(propKey, throw WhereParserException(s"Where clause contains not existing property name: $propKey")) - val metaSeq = labelMeta.seq - metaSeq match { - case LabelMeta.from.seq => parentEdge.srcVertex.innerId - case LabelMeta.to.seq => parentEdge.tgtVertex.innerId - case _ => parentEdge.propsWithTs.get(metaSeq) match { + labelMeta match { + case LabelMeta.from => parentEdge.srcVertex.innerId + case LabelMeta.to => parentEdge.tgtVertex.innerId + case _ => parentEdge.propsWithTs.get(labelMeta) match { case None => toInnerVal(labelMeta.defaultValue, labelMeta.dataType, label.schemaVersion) case Some(edgeVal) => edgeVal.innerVal } @@ -99,7 +100,13 @@ trait Clause extends ExtractValue { binOp(propValue, compValue) } } - +object Where { + def apply(labelName: String, sql: String): Try[Where] = { + val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) + val parser = new WhereParser(label) + parser.parse(sql) + } +} case class Where(clauses: Seq[Clause] = Seq.empty[Clause]) { def filter(edge: Edge) = if (clauses.isEmpty) true else clauses.map(_.filter(edge)).forall(identity) @@ -118,18 +125,36 @@ case class Eq(propKey: String, value: String) extends Clause { } case class InWithoutParent(label: Label, propKey: String, values: Set[String]) extends Clause { - val innerValLikeLs = values.map { value => + lazy val innerValLikeLsOut = values.map { value => val labelMeta = label.metaPropsInvMap.getOrElse(propKey, throw WhereParserException(s"Where clause contains not existing property name: $propKey")) val dataType = propKey match { case "_to" | "to" => label.tgtColumn.columnType case "_from" | "from" => label.srcColumn.columnType case _ => labelMeta.dataType } + + toInnerVal(value, dataType, label.schemaVersion) + } + + lazy val innerValLikeLsIn = values.map { value => + val labelMeta = label.metaPropsInvMap.getOrElse(propKey, throw WhereParserException(s"Where clause contains not existing property name: $propKey")) + val dataType = propKey match { + case "_to" | "to" => label.srcColumn.columnType + case "_from" | "from" => label.tgtColumn.columnType + case _ => labelMeta.dataType + } + toInnerVal(value, dataType, label.schemaVersion) } + override def filter(edge: Edge): Boolean = { - val propVal = propToInnerVal(edge, propKey) - innerValLikeLs.contains(propVal) + if (edge.dir == GraphUtil.directions("in")) { + val propVal = propToInnerVal(edge, propKey) + innerValLikeLsIn.contains(propVal) + } else { + val propVal = propToInnerVal(edge, propKey) + innerValLikeLsOut.contains(propVal) + } } } @@ -166,11 +191,15 @@ case class Or(left: Clause, right: Clause) extends Clause { object WhereParser { val success = Where() + } case class WhereParser(label: Label) extends JavaTokenParsers { - val anyStr = "[^\\s(),]+".r + + override val stringLiteral = (("'" ~> "(\\\\'|[^'])*".r <~ "'" ) ^^ (_.replace("\\'", "'"))) | anyStr + + val anyStr = "[^\\s(),']+".r val and = "and|AND".r @@ -190,33 +219,35 @@ case class WhereParser(label: Label) extends JavaTokenParsers { def identWithDot: Parser[String] = repsep(ident, ".") ^^ { case values => values.mkString(".") } - def predicate = { - identWithDot ~ ("!=" | "=") ~ anyStr ^^ { - case f ~ op ~ s => - if (op == "=") Eq(f, s) - else Not(Eq(f, s)) - } | identWithDot ~ (">=" | "<=" | ">" | "<") ~ anyStr ^^ { - case f ~ op ~ s => op match { - case ">" => Gt(f, s) - case ">=" => Or(Gt(f, s), Eq(f, s)) - case "<" => Lt(f, s) - case "<=" => Or(Lt(f, s), Eq(f, s)) - } - } | identWithDot ~ (between ~> anyStr <~ and) ~ anyStr ^^ { - case f ~ minV ~ maxV => Between(f, minV, maxV) - } | identWithDot ~ (notIn | in) ~ ("(" ~> repsep(anyStr, ",") <~ ")") ^^ { - case f ~ op ~ values => - val inClause = - if (f.startsWith("_parent")) IN(f, values.toSet) - else InWithoutParent(label, f, values.toSet) - if (op.toLowerCase == "in") inClause - else Not(inClause) - - - case _ => throw WhereParserException(s"Failed to parse where clause. ") + val _eq = identWithDot ~ ("!=" | "=") ~ stringLiteral ^^ { + case f ~ op ~ s => if (op == "=") Eq(f, s) else Not(Eq(f, s)) + } + + val _ltGt = identWithDot ~ (">=" | "<=" | ">" | "<") ~ stringLiteral ^^ { + case f ~ op ~ s => op match { + case ">" => Gt(f, s) + case ">=" => Or(Gt(f, s), Eq(f, s)) + case "<" => Lt(f, s) + case "<=" => Or(Lt(f, s), Eq(f, s)) } } + val _between = identWithDot ~ (between ~> stringLiteral <~ and) ~ stringLiteral ^^ { + case f ~ minV ~ maxV => Between(f, minV, maxV) + } + + val _in = identWithDot ~ (notIn | in) ~ ("(" ~> repsep(stringLiteral, ",") <~ ")") ^^ { + case f ~ op ~ values => + val inClause = + if (f.startsWith("_parent")) IN(f, values.toSet) + else InWithoutParent(label, f, values.toSet) + + if (op.toLowerCase == "in") inClause + else Not(inClause) + } + + def predicate = _eq | _ltGt | _between | _in + def parse(sql: String): Try[Where] = Try { parseAll(where, sql) match { case Success(r, q) => r
