Repository: incubator-s2graph Updated Branches: refs/heads/master 66bdf1bc0 -> f7154bac9
[S2GRAPH-123]: Support different index on out/in direction JIRA: [S2GRAPH-123] https://issues.apache.org/jira/browse/S2GRAPH-123 Pull Request: Closes #98 Authors DO YUNG YOON: [email protected] Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/f7154bac Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/f7154bac Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/f7154bac Branch: refs/heads/master Commit: f7154bac9d18f7a7a16098587f69483a773cf2b6 Parents: 66bdf1b Author: DO YUNG YOON <[email protected]> Authored: Wed Nov 16 18:22:58 2016 +0100 Committer: DO YUNG YOON <[email protected]> Committed: Wed Nov 16 18:22:58 2016 +0100 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/s2graph/core/mysqls/schema.sql | 21 ++--- .../scala/org/apache/s2graph/core/Edge.scala | 22 +++++- .../org/apache/s2graph/core/Management.scala | 7 +- .../org/apache/s2graph/core/mysqls/Label.scala | 4 +- .../apache/s2graph/core/mysqls/LabelIndex.scala | 81 ++++++++++++++++---- .../s2graph/core/rest/RequestParser.scala | 36 +++++++-- .../apache/s2graph/core/storage/Storage.scala | 5 +- 8 files changed, 137 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 01d1cc8..1c5c01e 100644 --- a/CHANGES +++ b/CHANGES @@ -44,6 +44,8 @@ Release 0.1.0 - unreleased (Contributed by Hyunsung Jo<[email protected]>, committed by DOYUNG YOON). S2GRAPH-125: Add options field on Label model for controlling advanced options (Committed by DOYUNG YOON). + + S2GRAPH-123: Support different index on out/in direction (Committed by DOYUNG YOON). IMPROVEMENT http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql ---------------------------------------------------------------------- diff --git a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql index 822df6c..b5e09c9 100644 --- a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql +++ b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql @@ -136,15 +136,18 @@ ALTER TABLE label_metas ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON DELET -- ---------------------------- DROP TABLE IF EXISTS `label_indices`; CREATE TABLE `label_indices` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `label_id` int(11) NOT NULL, - `name` varchar(64) NOT NULL DEFAULT '_PK', - `seq` tinyint(4) NOT NULL, - `meta_seqs` varchar(64) NOT NULL, - `formulars` varchar(255) DEFAULT NULL, - PRIMARY KEY (`id`), - UNIQUE KEY `ux_label_indices_label_id_seq` (`label_id`,`meta_seqs`), - UNIQUE KEY `ux_label_indices_label_id_name` (`label_id`,`name`) + `id` int(11) NOT NULL AUTO_INCREMENT, + `label_id` int(11) NOT NULL, + `name` varchar(64) NOT NULL DEFAULT '_PK', + `seq` tinyint(4) NOT NULL, + `meta_seqs` varchar(64) NOT NULL, + `formulars` varchar(255) DEFAULT NULL, + `dir` int DEFAULT NULL, + `options` text, + PRIMARY KEY (`id`), + UNIQUE KEY `ux_label_id_seq` (`label_id`,`meta_seqs`), + UNIQUE KEY `ux_label_id_name` (`label_id`,`name`), + UNIQUE KEY `ux_label_id_meta_seqs_dir` (`label_id`,`meta_seqs`,`dir`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ALTER TABLE label_indices ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON DELETE CASCADE; http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala index 657cfed..8a2784d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala @@ -448,6 +448,24 @@ object Edge { } } + def filterOutWithLabelOption(ls: Seq[IndexEdge]): Seq[IndexEdge] = ls.filter { ie => + ie.labelIndex.dir match { + case None => + // both direction use same indices that is defined when label creation. + true + case Some(dir) => + if (dir != ie.direction) { + // current labelIndex's direction is different with indexEdge's direction so don't touch + false + } else { + ie.labelIndex.writeOption.map { option => + val hashValueOpt = ie.orders.find { case (k, v) => k == LabelMeta.fromHash }.map{ case (k, v) => v.value.toString.toLong } + option.sample(ie, hashValueOpt) + }.getOrElse(true) + } + } + } + def buildMutation(snapshotEdgeOpt: Option[Edge], requestEdge: Edge, newVersion: Long, @@ -477,7 +495,7 @@ object Edge { val edgesToDelete = snapshotEdgeOpt match { case Some(snapshotEdge) if snapshotEdge.op != GraphUtil.operations("delete") => snapshotEdge.copy(op = GraphUtil.defaultOpByte) - .relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid } + .relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) } case _ => Nil } @@ -488,7 +506,7 @@ object Edge { version = newVersion, propsWithTs = newPropsWithTs, op = GraphUtil.defaultOpByte - ).relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid } + ).relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) } EdgeMutate(edgesToDelete = edgesToDelete, edgesToInsert = edgesToInsert, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index 89acc63..60900be 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -43,8 +43,7 @@ object Management { object Prop extends ((String, String, String) => Prop) - case class Index(name: String, propNames: Seq[String]) - + case class Index(name: String, propNames: Seq[String], direction: Option[Int] = None, options: Option[String] = None) } import HBaseType._ @@ -135,7 +134,7 @@ object Management { indices.foreach { index => val metaSeq = index.propNames.map { name => labelMetaMap(name).seq } - LabelIndex.findOrInsert(label.id.get, index.name, metaSeq.toList, "none") + LabelIndex.findOrInsert(label.id.get, index.name, metaSeq.toList, "none", index.direction, index.options) } label @@ -340,7 +339,7 @@ class Management(graph: Graph) { val old = Label.findByName(oldLabelName, useCache = false).getOrElse(throw new LabelNotExistException(s"Old label $oldLabelName not exists.")) val allProps = old.metas(useCache = false).map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) } - val allIndices = old.indices(useCache = false).map { index => Index(index.name, index.propNames) } + val allIndices = old.indices(useCache = false).map { index => Index(index.name, index.propNames, index.dir, index.options) } createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType, old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala index fdef677..09d15d7 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala @@ -211,11 +211,11 @@ object Label extends Model[Label] { if (indices.isEmpty) { // make default index with _PK, _timestamp, 0 - LabelIndex.findOrInsert(createdId, LabelIndex.DefaultName, LabelIndex.DefaultMetaSeqs.toList, "none") + LabelIndex.findOrInsert(createdId, LabelIndex.DefaultName, LabelIndex.DefaultMetaSeqs.toList, "none", None, None) } else { indices.foreach { index => val metaSeq = index.propNames.map { name => labelMetaMap(name) } - LabelIndex.findOrInsert(createdId, index.name, metaSeq.toList, "none") + LabelIndex.findOrInsert(createdId, index.name, metaSeq.toList, "none", index.direction, index.options) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala index c548868..7b1cd07 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala @@ -24,6 +24,7 @@ package org.apache.s2graph.core.mysqls */ import org.apache.s2graph.core.GraphUtil +import org.apache.s2graph.core.mysqls.LabelIndex.WriteOption import org.apache.s2graph.core.utils.logger import play.api.libs.json.{JsObject, JsString, Json} import scalikejdbc._ @@ -39,7 +40,30 @@ object LabelIndex extends Model[LabelIndex] { rs.string("meta_seqs").split(",").filter(_ != "").map(s => s.toByte).toList match { case metaSeqsList => metaSeqsList }, - rs.string("formulars")) + rs.string("formulars"), + rs.intOpt("dir"), + rs.stringOpt("options") + ) + } + object WriteOption { + val Default = WriteOption() + } + case class WriteOption(method: String = "default", + rate: Double = 1.0, + totalModular: Long = 100, + storeDegree: Boolean = true) { + + def sample[T](a: T, hashOpt: Option[Long]): Boolean = { + if (method == "drop") false + else if (method == "sample") { + if (scala.util.Random.nextDouble() < rate) true + else false + } else if (method == "hash_sample") { + val hash = hashOpt.getOrElse(throw new RuntimeException("hash_sample need _from_hash value")) + if ((hash.abs % totalModular) / totalModular.toDouble < rate) true + else false + } else true + } } def findById(id: Int)(implicit session: DBSession = AutoSession) = { @@ -62,24 +86,27 @@ object LabelIndex extends Model[LabelIndex] { } } - def insert(labelId: Int, indexName: String, seq: Byte, metaSeqs: List[Byte], formulars: String)(implicit session: DBSession = AutoSession): Long = { + def insert(labelId: Int, indexName: String, seq: Byte, metaSeqs: List[Byte], formulars: String, + direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): Long = { sql""" - insert into label_indices(label_id, name, seq, meta_seqs, formulars) - values (${labelId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${formulars}) + insert into label_indices(label_id, name, seq, meta_seqs, formulars, dir, options) + values (${labelId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${formulars}, ${direction}, ${options}) """ .updateAndReturnGeneratedKey.apply() } - def findOrInsert(labelId: Int, indexName: String, metaSeqs: List[Byte], formulars: String)(implicit session: DBSession = AutoSession): LabelIndex = { - findByLabelIdAndSeqs(labelId, metaSeqs) match { + def findOrInsert(labelId: Int, indexName: String, metaSeqs: List[Byte], formulars: String, + direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): LabelIndex = { + findByLabelIdAndSeqs(labelId, metaSeqs, direction) match { case Some(s) => s case None => val orders = findByLabelIdAll(labelId, false) val seq = (orders.size + 1).toByte assert(seq <= MaxOrderSeq) - val createdId = insert(labelId, indexName, seq, metaSeqs, formulars) + val createdId = insert(labelId, indexName, seq, metaSeqs, formulars, direction, options) val cacheKeys = List(s"labelId=$labelId:seq=$seq", - s"labelId=$labelId:seqs=$metaSeqs", s"labelId=$labelId:seq=$seq", s"id=$createdId") + s"labelId=$labelId:seqs=$metaSeqs:dir=$direction", s"labelId=$labelId:seq=$seq", s"id=$createdId") + cacheKeys.foreach { key => expireCache(key) expireCaches(key) @@ -89,11 +116,11 @@ object LabelIndex extends Model[LabelIndex] { } } - def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte])(implicit session: DBSession = AutoSession): Option[LabelIndex] = { - val cacheKey = "labelId=" + labelId + ":seqs=" + seqs.mkString(",") + def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte], direction: Option[Int])(implicit session: DBSession = AutoSession): Option[LabelIndex] = { + val cacheKey = "labelId=" + labelId + ":seqs=" + seqs.mkString(",") + ":dir=" + direction withCache(cacheKey) { sql""" - select * from label_indices where label_id = ${labelId} and meta_seqs = ${seqs.mkString(",")} + select * from label_indices where label_id = ${labelId} and meta_seqs = ${seqs.mkString(",")} and dir = ${direction} """.map { rs => LabelIndex(rs) }.single.apply } } @@ -118,7 +145,7 @@ object LabelIndex extends Model[LabelIndex] { val (labelId, seq) = (labelIndex.labelId, labelIndex.seq) sql"""delete from label_indices where id = ${id}""".execute.apply() - val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs") + val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs:dir=${labelIndex.dir}") cacheKeys.foreach { key => expireCache(key) expireCaches(key) @@ -136,7 +163,7 @@ object LabelIndex extends Model[LabelIndex] { (cacheKey -> x) }) putsToCache(ls.map { x => - var cacheKey = s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}" + var cacheKey = s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}:dir=${x.dir}" (cacheKey -> x) }) putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) => @@ -146,14 +173,38 @@ object LabelIndex extends Model[LabelIndex] { } } -case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, metaSeqs: Seq[Byte], formulars: String) { +case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, metaSeqs: Seq[Byte], formulars: String, + dir: Option[Int], options: Option[String]) { + // both lazy val label = Label.findById(labelId) lazy val metas = label.metaPropsMap lazy val sortKeyTypes = metaSeqs.flatMap(metaSeq => label.metaPropsMap.get(metaSeq)) lazy val sortKeyTypesArray = sortKeyTypes.toArray lazy val propNames = sortKeyTypes.map { labelMeta => labelMeta.name } + + val dirJs = dir.map(GraphUtil.fromDirection).getOrElse("both") + val optionsJs = try { options.map(Json.parse).getOrElse(Json.obj()) } catch { case e: Exception => Json.obj() } lazy val toJson = Json.obj( "name" -> name, - "propNames" -> sortKeyTypes.map(x => x.name) + "propNames" -> sortKeyTypes.map(x => x.name), + "dir" -> dirJs, + "options" -> optionsJs ) + + lazy val writeOption: Option[WriteOption] = try { + options.map { string => + val jsObj = Json.parse(string) + val method = (jsObj \ "method").as[String] + + val rate = (jsObj \ "rate").asOpt[Double].getOrElse(1.0) + val totalModular = (jsObj \ "totalModular").asOpt[Long].getOrElse(100L) + val storeDegree = (jsObj \ "degree").asOpt[Boolean].getOrElse(true) + + WriteOption(method, rate, totalModular, storeDegree) + } + } catch { + case e: Exception => + logger.error(s"Parse failed labelOption: ${this.label}", e) + None + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala index 5466a9a..8baf787 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala @@ -611,11 +611,23 @@ class RequestParser(graph: Graph) { Prop(propName, defaultValue, dataType) } - def toIndicesElements(jsValue: JsLookupResult): Seq[Index] = for { - jsObj <- jsValue.as[Seq[JsValue]] - indexName = (jsObj \ "name").as[String] - propNames = (jsObj \ "propNames").as[Seq[String]] - } yield Index(indexName, propNames) + def toIndicesElements(jsValue: JsLookupResult): Seq[Index] = { + val indices = for { + jsObj <- jsValue.as[Seq[JsValue]] + indexName = (jsObj \ "name").as[String] + propNames = (jsObj \ "propNames").as[Seq[String]] + direction = (jsObj \ "direction").asOpt[String].map(GraphUtil.toDirection) + options = (jsObj \ "options").asOpt[JsValue].map(_.toString) + } yield { + Index(indexName, propNames, direction, options) + } + + val (pk, others) = indices.partition(index => index.name == LabelIndex.DefaultName) + val (both, inOut) = others.partition(index => index.direction.isEmpty) + val (in, out) = inOut.partition(index => index.direction.get == GraphUtil.directions("in")) + + pk ++ both ++ in ++ out + } def toLabelElements(jsValue: JsValue) = Try { val labelName = parse[String](jsValue, "label") @@ -642,8 +654,8 @@ class RequestParser(graph: Graph) { val options = (jsValue \ "options").asOpt[JsValue].map(_.toString()) (labelName, srcServiceName, srcColumnName, srcColumnType, - tgtServiceName, tgtColumnName, tgtColumnType, isDirected, serviceName, - indices, allProps, consistencyLevel, hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options) + tgtServiceName, tgtColumnName, tgtColumnType, isDirected, serviceName, + indices, allProps, consistencyLevel, hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options) } def toIndexElements(jsValue: JsValue) = Try { @@ -703,6 +715,16 @@ class RequestParser(graph: Graph) { (labels, direction, ids, ts, vertices) } + def toFetchAndDeleteParam(json: JsValue) = { + val labelName = (json \ "label").as[String] + val fromOpt = (json \ "from").asOpt[JsValue] + val toOpt = (json \ "to").asOpt[JsValue] + val direction = (json \ "direction").asOpt[String].getOrElse("out") + val indexOpt = (json \ "index").asOpt[String] + val propsOpt = (json \ "props").asOpt[JsObject] + (labelName, fromOpt, toOpt, direction, indexOpt, propsOpt) + } + def parseExperiment(jsQuery: JsValue): Seq[ExperimentParam] = jsQuery.as[Seq[JsObject]].map { obj => def _require(field: String) = throw new RuntimeException(s"${field} not found") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f7154bac/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index 07e39aa..f2b07cd 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -1035,8 +1035,9 @@ abstract class Storage[Q, R](val graph: Graph, def incrementsInOut(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) = { - def filterOutDegree(e: IndexEdge): Boolean = true - + def filterOutDegree(e: IndexEdge): Boolean = + e.labelIndex.writeOption.fold(true)(_.storeDegree) + (edgeMutate.edgesToDelete.isEmpty, edgeMutate.edgesToInsert.isEmpty) match { case (true, true) =>
