http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala deleted file mode 100644 index 0b16449..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.mysqls - -import org.apache.s2graph.core.GraphUtil -import scalikejdbc._ - -import scala.util.{Try, Random} - -object Experiment extends Model[Experiment] { - val ImpressionKey = "S2-Impression-Id" - val ImpressionId = "Impression-Id" - - def apply(rs: WrappedResultSet): Experiment = { - Experiment(rs.intOpt("id"), - rs.int("service_id"), - rs.string("name"), - rs.string("description"), - rs.string("experiment_type"), - rs.int("total_modular")) - } - - def finds(serviceId: Int)(implicit session: DBSession = AutoSession): List[Experiment] = { - val cacheKey = "serviceId=" + serviceId - withCaches(cacheKey) { - sql"""select * from experiments where service_id = ${serviceId}""" - .map { rs => Experiment(rs) }.list().apply() - } - } - - def findBy(serviceId: Int, name: String)(implicit session: DBSession = AutoSession): Option[Experiment] = { - val cacheKey = "serviceId=" + serviceId + ":name=" + name - withCache(cacheKey) { - sql"""select * from experiments where service_id = ${serviceId} and name = ${name}""" - .map { rs => Experiment(rs) }.single.apply - } - } - - def findById(id: Int)(implicit session: DBSession = AutoSession): Option[Experiment] = { - val cacheKey = "id=" + id - withCache(cacheKey)( - sql"""select * from experiments where id = ${id}""" - .map { rs => Experiment(rs) }.single.apply - ) - } - - def insert(service: Service, name: String, description: String, experimentType: String = "t", totalModular: Int = 100) - (implicit session: DBSession = AutoSession): Try[Experiment] = { - Try { - sql"""INSERT INTO experiments(service_id, service_name, `name`, description, experiment_type, total_modular) - VALUES(${service.id.get}, ${service.serviceName}, $name, $description, $experimentType, $totalModular)""" - .updateAndReturnGeneratedKey().apply() - }.map { newId => - Experiment(Some(newId.toInt), service.id.get, name, description, experimentType, totalModular) - } - } -} - -case class Experiment(id: Option[Int], - serviceId: Int, - name: String, - description: String, - experimentType: String, - totalModular: Int) { - - def buckets = Bucket.finds(id.get) - - def rangeBuckets = for { - bucket <- buckets - range <- bucket.rangeOpt - } yield range -> bucket - - - def findBucket(uuid: String, impIdOpt: Option[String] = None): Option[Bucket] = { - impIdOpt match { - case Some(impId) => Bucket.findByImpressionId(impId) - case None => - val seed = experimentType match { - case "u" => (GraphUtil.murmur3(uuid) % totalModular) + 1 - case _ => Random.nextInt(totalModular) + 1 - } - findBucket(seed) - } - } - - def findBucket(uuidMod: Int): Option[Bucket] = { - rangeBuckets.find { case ((from, to), bucket) => - from <= uuidMod && uuidMod <= to - }.map(_._2) - } -}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala deleted file mode 100644 index 501a964..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.mysqls - -import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer -import scalikejdbc.{AutoSession, DBSession, WrappedResultSet} -import scalikejdbc._ - -object GlobalIndex extends Model[GlobalIndex] { - val vidField = "_vid_" - val eidField = "_eid_" - val labelField = "_label_" - val serviceField = "_service_" - val serviceColumnField = "_serviceColumn_" - val EdgeType = "edge" - val VertexType = "vertex" - val hiddenIndexFields = Set(vidField, eidField, labelField, serviceField, serviceColumnField) - -// val IndexName = "global_indices" - val VertexIndexName = "global_vertex_index" - val EdgeIndexName = "global_edge_index" - val TypeName = "test" - - def apply(rs: WrappedResultSet): GlobalIndex = { - GlobalIndex(rs.intOpt("id"), - rs.string("element_type"), - rs.string("prop_names").split(",").sorted, - rs.string("index_name")) - } - - def findBy(elementType: String, indexName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[GlobalIndex] = { - val cacheKey = s"elementType=$elementType:indexName=$indexName" - lazy val sql = sql"""select * from global_indices where element_type = ${elementType} and index_name = $indexName""".map { rs => GlobalIndex(rs) }.single.apply() - - if (useCache) withCache(cacheKey){sql} - else sql - } - - def insert(elementType: String, indexName: String, propNames: Seq[String])(implicit session: DBSession = AutoSession): Long = { - val allPropNames = (hiddenIndexFields.toSeq ++ propNames).sorted - sql"""insert into global_indices(element_type, prop_names, index_name) - values($elementType, ${allPropNames.mkString(",")}, $indexName)""" - .updateAndReturnGeneratedKey.apply() - } - - def findAll(elementType: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Seq[GlobalIndex] = { - lazy val ls = sql"""select * from global_indices where element_type = $elementType""".map { rs => GlobalIndex(rs) }.list.apply - if (useCache) { - listCache.withCache(s"findAll:elementType=$elementType") { - putsToCache(ls.map { globalIndex => - val cacheKey = s"elementType=${globalIndex.elementType}:indexName=${globalIndex.indexName}" - cacheKey -> globalIndex - }) - ls - } - } else { - ls - } - } - - def findGlobalIndex(elementType: String, hasContainers: java.util.List[HasContainer])(implicit session: DBSession = AutoSession): Option[GlobalIndex] = { - import scala.collection.JavaConversions._ - val indices = findAll(elementType, useCache = true) - val keys = hasContainers.map(_.getKey) - - val sorted = indices.map { index => - val matched = keys.filter(index.propNamesSet) - index -> matched.length - }.filter(_._2 > 0).sortBy(_._2 * -1) - - sorted.headOption.map(_._1) - } - -} - -case class GlobalIndex(id: Option[Int], - elementType: String, - propNames: Seq[String], - indexName: String) { - val backendIndexName = indexName + "_" + elementType - val backendIndexNameWithType = backendIndexName + "/test1" - lazy val propNamesSet = propNames.toSet -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala deleted file mode 100644 index c128163..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala +++ /dev/null @@ -1,511 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.mysqls - -import java.util.Calendar - -import com.typesafe.config.Config -import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException -import org.apache.s2graph.core.GraphUtil -import org.apache.s2graph.core.JSONParser._ -import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} -import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs} -import org.apache.s2graph.core.utils.logger -import play.api.libs.json.{JsArray, JsObject, JsValue, Json} -import scalikejdbc._ - -object Label extends Model[Label] { - - val maxHBaseTableNames = 2 - - def apply(rs: WrappedResultSet): Label = { - Label(Option(rs.int("id")), rs.string("label"), - rs.int("src_service_id"), rs.string("src_column_name"), rs.string("src_column_type"), - rs.int("tgt_service_id"), rs.string("tgt_column_name"), rs.string("tgt_column_type"), - rs.boolean("is_directed"), rs.string("service_name"), rs.int("service_id"), rs.string("consistency_level"), - rs.string("hbase_table_name"), rs.intOpt("hbase_table_ttl"), rs.string("schema_version"), rs.boolean("is_async"), - rs.string("compressionAlgorithm"), rs.stringOpt("options")) - } - - def deleteAll(label: Label)(implicit session: DBSession) = { - val id = label.id - LabelMeta.findAllByLabelId(id.get, false).foreach { x => LabelMeta.delete(x.id.get) } - LabelIndex.findByLabelIdAll(id.get, false).foreach { x => LabelIndex.delete(x.id.get) } - Label.delete(id.get) - } - - - def findByName(labelName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Label] = { - val cacheKey = "label=" + labelName - lazy val labelOpt = - sql""" - select * - from labels - where label = ${labelName} - and deleted_at is null """.map { rs => Label(rs) }.single.apply() - - if (useCache) withCache(cacheKey)(labelOpt) - else labelOpt - } - - def insert(label: String, - srcServiceId: Int, - srcColumnName: String, - srcColumnType: String, - tgtServiceId: Int, - tgtColumnName: String, - tgtColumnType: String, - isDirected: Boolean, - serviceName: String, - serviceId: Int, - consistencyLevel: String, - hTableName: String, - hTableTTL: Option[Int], - schemaVersion: String, - isAsync: Boolean, - compressionAlgorithm: String, - options: Option[String])(implicit session: DBSession = AutoSession) = { - sql""" - insert into labels(label, - src_service_id, src_column_name, src_column_type, - tgt_service_id, tgt_column_name, tgt_column_type, - is_directed, service_name, service_id, consistency_level, hbase_table_name, hbase_table_ttl, schema_version, is_async, - compressionAlgorithm, options) - values (${label}, - ${srcServiceId}, ${srcColumnName}, ${srcColumnType}, - ${tgtServiceId}, ${tgtColumnName}, ${tgtColumnType}, - ${isDirected}, ${serviceName}, ${serviceId}, ${consistencyLevel}, ${hTableName}, ${hTableTTL}, - ${schemaVersion}, ${isAsync}, ${compressionAlgorithm}, ${options}) - """ - .updateAndReturnGeneratedKey.apply() - } - - def findByIdOpt(id: Int)(implicit session: DBSession = AutoSession): Option[Label] = { - val cacheKey = "id=" + id - withCache(cacheKey)( - sql""" - select * - from labels - where id = ${id} - and deleted_at is null""" - .map { rs => Label(rs) }.single.apply()) - } - - def findById(id: Int)(implicit session: DBSession = AutoSession): Label = { - val cacheKey = "id=" + id - withCache(cacheKey)( - sql""" - select * - from labels - where id = ${id} - and deleted_at is null""" - .map { rs => Label(rs) }.single.apply()).get - } - - def findByTgtColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = { - val cacheKey = "tgtColumnId=" + columnId - val col = ServiceColumn.findById(columnId) - withCaches(cacheKey)( - sql""" - select * - from labels - where tgt_column_name = ${col.columnName} - and service_id = ${col.serviceId} - and deleted_at is null - """.map { rs => Label(rs) }.list().apply()) - } - - def findBySrcColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = { - val cacheKey = "srcColumnId=" + columnId - val col = ServiceColumn.findById(columnId) - withCaches(cacheKey)( - sql""" - select * - from labels - where src_column_name = ${col.columnName} - and service_id = ${col.serviceId} - and deleted_at is null - """.map { rs => Label(rs) }.list().apply()) - } - - def findBySrcServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = { - val cacheKey = "srcServiceId=" + serviceId - withCaches(cacheKey)( - sql"""select * from labels where src_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply - ) - } - - def findByTgtServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = { - val cacheKey = "tgtServiceId=" + serviceId - withCaches(cacheKey)( - sql"""select * from labels where tgt_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply - ) - } - - def insertAll(labelName: String, srcServiceName: String, srcColumnName: String, srcColumnType: String, - tgtServiceName: String, tgtColumnName: String, tgtColumnType: String, - isDirected: Boolean = true, - serviceName: String, - indices: Seq[Index], - metaProps: Seq[Prop], - consistencyLevel: String, - hTableName: Option[String], - hTableTTL: Option[Int], - schemaVersion: String, - isAsync: Boolean, - compressionAlgorithm: String, - options: Option[String])(implicit session: DBSession = AutoSession): Label = { - - val srcServiceOpt = Service.findByName(srcServiceName, useCache = false) - val tgtServiceOpt = Service.findByName(tgtServiceName, useCache = false) - val serviceOpt = Service.findByName(serviceName, useCache = false) - if (srcServiceOpt.isEmpty) throw new RuntimeException(s"source service $srcServiceName is not created.") - if (tgtServiceOpt.isEmpty) throw new RuntimeException(s"target service $tgtServiceName is not created.") - if (serviceOpt.isEmpty) throw new RuntimeException(s"service $serviceName is not created.") - - val newLabel = for { - srcService <- srcServiceOpt - tgtService <- tgtServiceOpt - service <- serviceOpt - } yield { - val srcServiceId = srcService.id.get - val tgtServiceId = tgtService.id.get - val serviceId = service.id.get - - /* insert serviceColumn */ - val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType)) - val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType)) - - if (srcCol.columnType != srcColumnType) throw new RuntimeException(s"source service column type not matched ${srcCol.columnType} != ${srcColumnType}") - if (tgtCol.columnType != tgtColumnType) throw new RuntimeException(s"target service column type not matched ${tgtCol.columnType} != ${tgtColumnType}") - - /* create label */ - Label.findByName(labelName, useCache = false).getOrElse { - - val createdId = insert(labelName, srcServiceId, srcColumnName, srcColumnType, - tgtServiceId, tgtColumnName, tgtColumnType, isDirected, serviceName, serviceId, consistencyLevel, - hTableName.getOrElse(service.hTableName), hTableTTL.orElse(service.hTableTTL), schemaVersion, isAsync, - compressionAlgorithm, options).toInt - - val labelMetaMap = metaProps.map { case Prop(propName, defaultValue, dataType, storeInGlobalIndex) => - val labelMeta = LabelMeta.findOrInsert(createdId, propName, defaultValue, dataType, storeInGlobalIndex) - (propName -> labelMeta.seq) - }.toMap ++ LabelMeta.reservedMetas.map (labelMeta => labelMeta.name -> labelMeta.seq).toMap - - if (indices.isEmpty) { - // make default index with _PK, _timestamp, 0 - LabelIndex.findOrInsert(createdId, LabelIndex.DefaultName, LabelIndex.DefaultMetaSeqs.toList, "none", None, None) - } else { - indices.foreach { index => - val metaSeq = index.propNames.map { name => labelMetaMap(name) } - LabelIndex.findOrInsert(createdId, index.name, metaSeq.toList, "none", index.direction, index.options) - } - } - - val cacheKeys = List(s"id=$createdId", s"label=$labelName") - val ret = findByName(labelName, useCache = false).get - putsToCache(cacheKeys.map(k => k -> ret)) - ret - } - } - - newLabel.getOrElse(throw new RuntimeException("failed to create label")) - } - - def findAll()(implicit session: DBSession = AutoSession) = { - val ls = sql"""select * from labels where deleted_at is null""".map { rs => Label(rs) }.list().apply() - - putsToCache(ls.map { x => - val cacheKey = s"id=${x.id.get}" - (cacheKey -> x) - }) - - putsToCache(ls.map { x => - val cacheKey = s"label=${x.label}" - (cacheKey -> x) - }) - - ls - } - - def updateName(oldName: String, newName: String)(implicit session: DBSession = AutoSession) = { - logger.info(s"rename label: $oldName -> $newName") - sql"""update labels set label = ${newName} where label = ${oldName}""".update.apply() - } - - def updateHTableName(labelName: String, newHTableName: String)(implicit session: DBSession = AutoSession) = { - logger.info(s"update HTable of label $labelName to $newHTableName") - val cnt = sql"""update labels set hbase_table_name = $newHTableName where label = $labelName""".update().apply() - val label = Label.findByName(labelName, useCache = false).get - - val cacheKeys = List(s"id=${label.id}", s"label=${label.label}") - cacheKeys.foreach { key => - expireCache(key) - expireCaches(key) - } - cnt - } - - def delete(id: Int)(implicit session: DBSession = AutoSession) = { - val label = findById(id) - logger.info(s"delete label: $label") - val cnt = sql"""delete from labels where id = ${label.id.get}""".update().apply() - val cacheKeys = List(s"id=$id", s"label=${label.label}") - cacheKeys.foreach { key => - expireCache(key) - expireCaches(key) - } - cnt - } - - def markDeleted(label: Label)(implicit session: DBSession = AutoSession) = { - - logger.info(s"mark deleted label: $label") - val oldName = label.label - val now = Calendar.getInstance().getTime - val newName = s"deleted_${now.getTime}_"+ label.label - val cnt = sql"""update labels set label = ${newName}, deleted_at = ${now} where id = ${label.id.get}""".update.apply() - val cacheKeys = List(s"id=${label.id}", s"label=${oldName}") - cacheKeys.foreach { key => - expireCache(key) - expireCaches(key) - } - cnt - } -} - -case class Label(id: Option[Int], label: String, - srcServiceId: Int, srcColumnName: String, srcColumnType: String, - tgtServiceId: Int, tgtColumnName: String, tgtColumnType: String, - isDirected: Boolean = true, serviceName: String, serviceId: Int, consistencyLevel: String = "strong", - hTableName: String, hTableTTL: Option[Int], - schemaVersion: String, isAsync: Boolean = false, - compressionAlgorithm: String, - options: Option[String]) { - def metas(useCache: Boolean = true) = LabelMeta.findAllByLabelId(id.get, useCache = useCache) - - def indices(useCache: Boolean = true) = LabelIndex.findByLabelIdAll(id.get, useCache = useCache) - - // lazy val firstHBaseTableName = hbaseTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME) - lazy val srcService = Service.findById(srcServiceId) - lazy val tgtService = Service.findById(tgtServiceId) - lazy val service = Service.findById(serviceId) - /** - * TODO - * change this to apply hbase table from target serviceName - */ - // lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.tableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME)) - // lazy val (hbaseZkAddr, hbaseTableName) = (Config.HBASE_ZOOKEEPER_QUORUM, hTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME)) - // lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, hTableName.split(",").headOption.getOrElse(GraphConnection.getConfVal("hbase.table.name"))) - lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, hTableName.split(",").head) - - lazy val srcColumn = ServiceColumn.find(srcServiceId, srcColumnName).getOrElse(throw ModelNotFoundException("Source column not found")) - lazy val tgtColumn = ServiceColumn.find(tgtServiceId, tgtColumnName).getOrElse(throw ModelNotFoundException("Target column not found")) - - lazy val defaultIndex = LabelIndex.findByLabelIdAndSeq(id.get, LabelIndex.DefaultSeq) - - //TODO: Make sure this is correct - -// lazy val metas = metas(useCache = true) - lazy val indices = LabelIndex.findByLabelIdAll(id.get, useCache = true) - lazy val labelMetas = LabelMeta.findAllByLabelId(id.get, useCache = true) - lazy val labelMetaSet = labelMetas.toSet - lazy val labelMetaMap = (labelMetas ++ LabelMeta.reservedMetas).map(m => m.seq -> m).toMap - - lazy val indicesMap = indices.map(idx => (idx.seq, idx)) toMap - lazy val indexSeqsMap = indices.map(idx => (idx.metaSeqs, idx)) toMap - lazy val indexNameMap = indices.map(idx => (idx.name, idx)) toMap - lazy val extraIndices = indices.filter(idx => defaultIndex.isDefined && idx.id.get != defaultIndex.get.id.get) - // indices filterNot (_.id.get == defaultIndex.get.id.get) - lazy val extraIndicesMap = extraIndices.map(idx => (idx.seq, idx)) toMap - - lazy val metaProps = LabelMeta.reservedMetas.map { m => - if (m == LabelMeta.to) m.copy(dataType = tgtColumnType) - else if (m == LabelMeta.from) m.copy(dataType = srcColumnType) - else m - } ::: LabelMeta.findAllByLabelId(id.get, useCache = true) - - lazy val metaPropsInner = LabelMeta.reservedMetasInner.map { m => - if (m == LabelMeta.to) m.copy(dataType = tgtColumnType) - else if (m == LabelMeta.from) m.copy(dataType = srcColumnType) - else m - } ::: LabelMeta.findAllByLabelId(id.get, useCache = true) - - lazy val metaPropsMap = metaProps.map(x => (x.seq, x)).toMap - lazy val metaPropsInvMap = metaProps.map(x => (x.name, x)).toMap - lazy val metaPropNames = metaProps.map(x => x.name) - lazy val metaPropNamesMap = metaProps.map(x => (x.seq, x.name)) toMap - - /** this is used only by edgeToProps */ - lazy val metaPropsDefaultMap = (for { - prop <- metaProps if LabelMeta.isValidSeq(prop.seq) - jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType) - } yield prop.name -> jsValue).toMap - - lazy val metaPropsDefaultMapInnerString = (for { - prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq) - innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis()) - } yield prop.name -> innerVal).toMap - - lazy val metaPropsDefaultMapInner = (for { - prop <- metaPropsInner - innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis()) - } yield prop -> innerVal).toMap - lazy val metaPropsDefaultMapInnerSeq = metaPropsDefaultMapInner.toSeq - lazy val metaPropsJsValueWithDefault = (for { - prop <- metaProps if LabelMeta.isValidSeq(prop.seq) - jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType) - } yield prop -> jsValue).toMap -// lazy val extraOptions = Model.extraOptions(Option("""{ -// "storage": { -// "s2graph.storage.backend": "rocks", -// "rocks.db.path": "/tmp/db" -// } -// }""")) - - lazy val tokens: Set[String] = extraOptions.get("tokens").fold(Set.empty[String]) { - case JsArray(tokens) => tokens.map(_.as[String]).toSet - case _ => - logger.error("Invalid token JSON") - Set.empty[String] - } - - lazy val extraOptions = Model.extraOptions(options) - - lazy val durability = extraOptions.get("durability").map(_.as[Boolean]).getOrElse(true) - - lazy val storageConfigOpt: Option[Config] = toStorageConfig - - def toStorageConfig: Option[Config] = { - Model.toStorageConfig(extraOptions) - } - - - def srcColumnWithDir(dir: Int) = { - // GraphUtil.directions("out" - if (dir == 0) srcColumn else tgtColumn - } - - def tgtColumnWithDir(dir: Int) = { - // GraphUtil.directions("out" - if (dir == 0) tgtColumn else srcColumn - } - - lazy val tgtSrc = (tgtColumn, srcColumn) - lazy val srcTgt = (srcColumn, tgtColumn) - - def srcTgtColumn(dir: Int) = if (dir == 1) tgtSrc else srcTgt - - lazy val EmptyPropsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 0)) -// def init() = { -// metas() -// metaSeqsToNames() -// service -// srcColumn -// tgtColumn -// defaultIndex -// indices -// metaProps -// } - - // def srcColumnInnerVal(jsValue: JsValue) = { - // jsValueToInnerVal(jsValue, srcColumnType, version) - // } - // def tgtColumnInnerVal(jsValue: JsValue) = { - // jsValueToInnerVal(jsValue, tgtColumnType, version) - // } - - override def toString(): String = { - val orderByKeys = LabelMeta.findAllByLabelId(id.get) - super.toString() + orderByKeys.toString() - } - - // def findLabelIndexSeq(scoring: List[(Byte, Double)]): Byte = { - // if (scoring.isEmpty) LabelIndex.defaultSeq - // else { - // LabelIndex.findByLabelIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq) - // - //// LabelIndex.findByLabelIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq) - // } - // } - lazy val toJson = { - val allIdxs = LabelIndex.findByLabelIdAll(id.get, useCache = false) - val defaultIdxOpt = LabelIndex.findByLabelIdAndSeq(id.get, LabelIndex.DefaultSeq, useCache = false) - val extraIdxs = allIdxs.filter(idx => defaultIdxOpt.isDefined && idx.id.get != defaultIdxOpt.get.id.get) - val metaProps = LabelMeta.reservedMetas.map { m => - if (m == LabelMeta.to) m.copy(dataType = tgtColumnType) - else if (m == LabelMeta.from) m.copy(dataType = srcColumnType) - else m - } ::: LabelMeta.findAllByLabelId(id.get, useCache = false) - - val defaultIdx = defaultIdxOpt.map(x => x.toJson).getOrElse(Json.obj()) - val optionsJs = try { - val obj = options.map(Json.parse).getOrElse(Json.obj()).as[JsObject] - if (!obj.value.contains("tokens")) obj - else obj ++ Json.obj("tokens" -> obj.value("tokens").as[Seq[String]].map("*" * _.length)) - - } catch { case e: Exception => Json.obj() } - - Json.obj("labelName" -> label, - "from" -> srcColumn.toJson, "to" -> tgtColumn.toJson, - "isDirected" -> isDirected, - "serviceName" -> serviceName, - "consistencyLevel" -> consistencyLevel, - "schemaVersion" -> schemaVersion, - "isAsync" -> isAsync, - "compressionAlgorithm" -> compressionAlgorithm, - "defaultIndex" -> defaultIdx, - "extraIndex" -> extraIdxs.map(exIdx => exIdx.toJson), - "metaProps" -> metaProps.filter { labelMeta => LabelMeta.isValidSeqForAdmin(labelMeta.seq) }.map(_.toJson), - "options" -> optionsJs - ) - } - - def propsToInnerValsWithTs(props: Map[String, Any], - ts: Long = System.currentTimeMillis()): Map[LabelMeta, InnerValLikeWithTs] = { - for { - (k, v) <- props - labelMeta <- metaPropsInvMap.get(k) - innerVal = toInnerVal(v, labelMeta.dataType, schemaVersion) - } yield labelMeta -> InnerValLikeWithTs(innerVal, ts) - - } - - def innerValsWithTsToProps(props: Map[LabelMeta, InnerValLikeWithTs], - selectColumns: Map[Byte, Boolean]): Map[String, Any] = { - if (selectColumns.isEmpty) { - for { - (meta, v) <- metaPropsDefaultMapInner ++ props - } yield { - meta.name -> innerValToAny(v.innerVal, meta.dataType) - } - } else { - for { - (k, _) <- selectColumns - if k != LabelMeta.toSeq && k != LabelMeta.fromSeq - labelMeta <- metaPropsMap.get(k) - } yield { - val v = props.get(labelMeta).orElse(metaPropsDefaultMapInner.get(labelMeta)).get - labelMeta.name -> innerValToAny(v.innerVal, labelMeta.dataType) - } - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala deleted file mode 100644 index 1da0e55..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.mysqls - -import org.apache.s2graph.core.GraphUtil -import org.apache.s2graph.core.mysqls.LabelIndex.LabelIndexMutateOption -import org.apache.s2graph.core.utils.logger -import play.api.libs.json.{JsObject, JsString, Json} -import scalikejdbc._ - -object LabelIndex extends Model[LabelIndex] { - val DefaultName = "_PK" - val DefaultMetaSeqs = Seq(LabelMeta.timestampSeq) - val DefaultSeq = 1.toByte - val MaxOrderSeq = 7 - - def apply(rs: WrappedResultSet): LabelIndex = { - LabelIndex(rs.intOpt("id"), rs.int("label_id"), rs.string("name"), rs.byte("seq"), - rs.string("meta_seqs").split(",").filter(_ != "").map(s => s.toByte).toList match { - case metaSeqsList => metaSeqsList - }, - rs.string("formulars"), - rs.intOpt("dir"), - rs.stringOpt("options") - ) - } - - case class LabelIndexMutateOption(dir: Byte, - method: String, - rate: Double, - totalModular: Long, - storeDegree: Boolean) { - - val isBufferIncrement = method == "drop" || method == "sample" || method == "hash_sample" - - def sample[T](a: T, hashOpt: Option[Long]): Boolean = { - if (method == "drop") false - else if (method == "sample") { - if (scala.util.Random.nextDouble() < rate) true - else false - } else if (method == "hash_sample") { - val hash = hashOpt.getOrElse(throw new RuntimeException("hash_sample need _from_hash value")) - if ((hash.abs % totalModular) / totalModular.toDouble < rate) true - else false - } else true - } - } - - def findById(id: Int)(implicit session: DBSession = AutoSession) = { - val cacheKey = "id=" + id - withCache(cacheKey) { - sql"""select * from label_indices where id = ${id}""".map { rs => LabelIndex(rs) }.single.apply - }.get - } - - def findByLabelIdAll(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = { - val cacheKey = "labelId=" + labelId - if (useCache) { - withCaches(cacheKey)( sql""" - select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC - """.map { rs => LabelIndex(rs) }.list.apply) - } else { - sql""" - select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC - """.map { rs => LabelIndex(rs) }.list.apply - } - } - - def insert(labelId: Int, indexName: String, seq: Byte, metaSeqs: List[Byte], formulars: String, - direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): Long = { - sql""" - insert into label_indices(label_id, name, seq, meta_seqs, formulars, dir, options) - values (${labelId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${formulars}, ${direction}, ${options}) - """ - .updateAndReturnGeneratedKey.apply() - } - - def findOrInsert(labelId: Int, indexName: String, metaSeqs: List[Byte], formulars: String, - direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): LabelIndex = { - findByLabelIdAndSeqs(labelId, metaSeqs, direction) match { - case Some(s) => s - case None => - val orders = findByLabelIdAll(labelId, false) - val seq = (orders.size + 1).toByte - assert(seq <= MaxOrderSeq) - val createdId = insert(labelId, indexName, seq, metaSeqs, formulars, direction, options) - val cacheKeys = List(s"labelId=$labelId:seq=$seq", - s"labelId=$labelId:seqs=$metaSeqs:dir=$direction", s"labelId=$labelId:seq=$seq", s"id=$createdId") - - cacheKeys.foreach { key => - expireCache(key) - expireCaches(key) - } - - findByLabelIdAndSeq(labelId, seq).get - } - } - - def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte], direction: Option[Int])(implicit session: DBSession = AutoSession): Option[LabelIndex] = { - val cacheKey = "labelId=" + labelId + ":seqs=" + seqs.mkString(",") + ":dir=" + direction - withCache(cacheKey) { - sql""" - select * from label_indices where label_id = ${labelId} and meta_seqs = ${seqs.mkString(",")} and dir = ${direction} - """.map { rs => LabelIndex(rs) }.single.apply - } - } - - def findByLabelIdAndSeq(labelId: Int, seq: Byte, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[LabelIndex] = { - // val cacheKey = s"labelId=$labelId:seq=$seq" - val cacheKey = "labelId=" + labelId + ":seq=" + seq - if (useCache) { - withCache(cacheKey)( sql""" - select * from label_indices where label_id = ${labelId} and seq = ${seq} - """.map { rs => LabelIndex(rs) }.single.apply) - } else { - sql""" - select * from label_indices where label_id = ${labelId} and seq = ${seq} - """.map { rs => LabelIndex(rs) }.single.apply - } - } - - def delete(id: Int)(implicit session: DBSession = AutoSession) = { - val labelIndex = findById(id) - val seqs = labelIndex.metaSeqs.mkString(",") - val (labelId, seq) = (labelIndex.labelId, labelIndex.seq) - sql"""delete from label_indices where id = ${id}""".execute.apply() - - val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs:dir=${labelIndex.dir}") - cacheKeys.foreach { key => - expireCache(key) - expireCaches(key) - } - } - - def findAll()(implicit session: DBSession = AutoSession) = { - val ls = sql"""select * from label_indices""".map { rs => LabelIndex(rs) }.list.apply - putsToCache(ls.map { x => - var cacheKey = s"id=${x.id.get}" - (cacheKey -> x) - }) - putsToCache(ls.map { x => - var cacheKey = s"labelId=${x.labelId}:seq=${x.seq}}" - (cacheKey -> x) - }) - putsToCache(ls.map { x => - var cacheKey = s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}:dir=${x.dir}" - (cacheKey -> x) - }) - putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) => - val cacheKey = s"labelId=${labelId}" - (cacheKey -> ls) - }.toList) - } -} -/** -mgmt.buildIndex('nameAndAge',Vertex.class) -.addKey(name,Mapping.TEXT.getParameter()) -.addKey(age,Mapping.TEXT.getParameter()) -.buildMixedIndex("search") - -v: {name: abc} - E1: {age: 20}, E2, E3.... - -Management.createServiceColumn( - serviceName = serviceName, columnName = "person", columnType = "integer", - props = Seq( - Prop("name", "-", "string"), - Prop("age", "0", "integer"), - Prop("location", "-", "string") - ) -) - -management.createLabel( - label = "bought", - srcServiceName = serviceName, srcColumnName = "person", srcColumnType = "integer", - tgtServiceName = serviceName, tgtColumnName = "product", tgtColumnType = "integer", idDirected = true, - serviceName = serviceName, - indices = Seq( - Index("PK", Seq("amount", "created_at"), IndexType("mixed", propsMapping: Map[String, String]), -{"in": {}, "out": {}}) - ), - props = Seq( - Prop("amount", "0.0", "double"), - Prop("created_at", "2000-01-01", "string") - ), - consistencyLevel = "strong" -) - -mgmt.buildIndex('PK', Edge.class) - .addKey(amount, Double) - .buildCompositeIndex - -*/ -case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, metaSeqs: Seq[Byte], formulars: String, - dir: Option[Int], options: Option[String]) { - // both - lazy val label = Label.findById(labelId) - lazy val metas = label.metaPropsMap - lazy val sortKeyTypes = metaSeqs.flatMap(metaSeq => label.metaPropsMap.get(metaSeq)) - lazy val sortKeyTypesArray = sortKeyTypes.toArray - lazy val propNames = sortKeyTypes.map { labelMeta => labelMeta.name } - - lazy val toJson = { - val dirJs = dir.map(GraphUtil.fromDirection).getOrElse("both") - val optionsJs = try { options.map(Json.parse).getOrElse(Json.obj()) } catch { case e: Exception => Json.obj() } - - Json.obj( - "name" -> name, - "propNames" -> sortKeyTypes.map(x => x.name), - "dir" -> dirJs, - "options" -> optionsJs - ) - } - - def parseOption(dir: String): Option[LabelIndexMutateOption] = try { - options.map { string => - val jsObj = Json.parse(string) \ dir - - val method = (jsObj \ "method").asOpt[String].getOrElse("default") - val rate = (jsObj \ "rate").asOpt[Double].getOrElse(1.0) - val totalModular = (jsObj \ "totalModular").asOpt[Long].getOrElse(100L) - val storeDegree = (jsObj \ "storeDegree").asOpt[Boolean].getOrElse(true) - - LabelIndexMutateOption(GraphUtil.directions(dir).toByte, method, rate, totalModular, storeDegree) - } - } catch { - case e: Exception => - logger.error(s"Parse failed labelOption: ${this.label}", e) - None - } - - lazy val inDirOption = parseOption("in") - - lazy val outDirOption = parseOption("out") -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala deleted file mode 100644 index 3f54f49..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.mysqls - -/** - * Created by shon on 6/3/15. - */ - -import org.apache.s2graph.core.GraphExceptions.MaxPropSizeReachedException -import org.apache.s2graph.core.{GraphExceptions, JSONParser} -import play.api.libs.json.Json -import scalikejdbc._ - -import scala.util.Try - -object LabelMeta extends Model[LabelMeta] { - - /** dummy sequences */ - - val fromSeq = (-4).toByte - val toSeq = (-5).toByte - val lastOpSeq = (-3).toByte - val lastDeletedAtSeq = (-2).toByte - val timestampSeq = (0).toByte - val labelSeq = (-6).toByte - val directionSeq = -7.toByte - val fromHashSeq = -8.toByte - - val countSeq = (Byte.MaxValue - 2).toByte - val degreeSeq = (Byte.MaxValue - 1).toByte - val maxValue = Byte.MaxValue - val emptySeq = Byte.MaxValue - - /** reserved sequences */ - // val deleted = LabelMeta(id = Some(lastDeletedAt), labelId = lastDeletedAt, name = "lastDeletedAt", - // seq = lastDeletedAt, defaultValue = "", dataType = "long") - val fromHash = LabelMeta(id = None, labelId = fromHashSeq, name = "_from_hash", - seq = fromHashSeq, defaultValue = fromHashSeq.toString, dataType = "long") - val from = LabelMeta(id = Some(fromSeq), labelId = fromSeq, name = "_from", - seq = fromSeq, defaultValue = fromSeq.toString, dataType = "string") - val to = LabelMeta(id = Some(toSeq), labelId = toSeq, name = "_to", - seq = toSeq, defaultValue = toSeq.toString, dataType = "string") - val timestamp = LabelMeta(id = Some(-1), labelId = -1, name = "_timestamp", - seq = timestampSeq, defaultValue = "0", dataType = "long") - val degree = LabelMeta(id = Some(-1), labelId = -1, name = "_degree", - seq = degreeSeq, defaultValue = "0", dataType = "long") - val count = LabelMeta(id = Some(-1), labelId = -1, name = "_count", - seq = countSeq, defaultValue = "-1", dataType = "long") - val lastDeletedAt = LabelMeta(id = Some(-1), labelId = -1, name = "_lastDeletedAt", - seq = lastDeletedAtSeq, defaultValue = "-1", dataType = "long") - val label = LabelMeta(id = Some(-1), labelId = -1, name = "label", - seq = labelSeq, defaultValue = "", dataType = "string") - val direction = LabelMeta(id = Some(-1), labelId = -1, name = "direction", - seq = directionSeq, defaultValue = "out", dataType = "string") - val empty = LabelMeta(id = Some(-1), labelId = -1, name = "_empty", - seq = emptySeq, defaultValue = "-1", dataType = "long") - - // Each reserved column(_timestamp, timestamp) has same seq number, starts with '_' has high priority - val reservedMetas = List(empty, label, direction, lastDeletedAt, from, fromHash, to, degree, timestamp, count).flatMap { lm => List(lm, lm.copy(name = lm.name.drop(1))) }.reverse - val reservedMetasInner = List(empty, label, direction, lastDeletedAt, from, fromHash, to, degree, timestamp, count) - val reservedMetaNamesSet = reservedMetasInner.map(_.name).toSet - - val defaultRequiredMetaNames = Set("from", "_from", "to", "_to", "_from_hash", "label", "direction", "timestamp", "_timestamp") - - def apply(rs: WrappedResultSet): LabelMeta = { - LabelMeta(Some(rs.int("id")), rs.int("label_id"), rs.string("name"), rs.byte("seq"), - rs.string("default_value"), rs.string("data_type").toLowerCase, rs.boolean("store_in_global_index")) - } - - /** Note: DegreeSeq should not be included in serializer/deserializer. - * only 0 <= seq <= CountSeq(Int.MaxValue - 2), not DegreeSet(Int.MaxValue - 1) should be - * included in actual bytes in storage. - * */ - def isValidSeq(seq: Byte): Boolean = seq >= 0 && seq <= countSeq // || seq == fromHashSeq - - def isValidSeqForAdmin(seq: Byte): Boolean = seq > 0 && seq < countSeq // || seq == fromHashSeq - - def findById(id: Int)(implicit session: DBSession = AutoSession): LabelMeta = { - val cacheKey = "id=" + id - - withCache(cacheKey) { - sql"""select * from label_metas where id = ${id}""".map { rs => LabelMeta(rs) }.single.apply - }.get - } - - def findAllByLabelId(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[LabelMeta] = { - val cacheKey = "labelId=" + labelId - lazy val labelMetas = sql"""select * - from label_metas - where label_id = ${labelId} order by seq ASC""".map(LabelMeta(_)).list.apply() - - if (useCache) withCaches(cacheKey)(labelMetas) - else labelMetas - } - - def findByName(labelId: Int, name: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[LabelMeta] = { - name match { - case timestamp.name => Some(timestamp) - case from.name => Some(from) - case to.name => Some(to) - case _ => - val cacheKey = "labelId=" + labelId + ":name=" + name - lazy val labelMeta = sql""" - select * - from label_metas where label_id = ${labelId} and name = ${name}""" - .map { rs => LabelMeta(rs) }.single.apply() - - if (useCache) withCache(cacheKey)(labelMeta) - else labelMeta - } - } - - def insert(labelId: Int, name: String, defaultValue: String, dataType: String, storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession) = { - val ls = findAllByLabelId(labelId, useCache = false) - val seq = ls.size + 1 - - if (seq < maxValue) { - sql"""insert into label_metas(label_id, name, seq, default_value, data_type, store_in_global_index) - select ${labelId}, ${name}, ${seq}, ${defaultValue}, ${dataType}, ${storeInGlobalIndex}""".updateAndReturnGeneratedKey.apply() - } else { - throw MaxPropSizeReachedException("max property size reached") - } - } - - def findOrInsert(labelId: Int, - name: String, - defaultValue: String, - dataType: String, - storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession): LabelMeta = { - - findByName(labelId, name) match { - case Some(c) => c - case None => - insert(labelId, name, defaultValue, dataType, storeInGlobalIndex) - val cacheKey = "labelId=" + labelId + ":name=" + name - val cacheKeys = "labelId=" + labelId - expireCache(cacheKey) - expireCaches(cacheKeys) - findByName(labelId, name, useCache = false).get - } - } - - def delete(id: Int)(implicit session: DBSession = AutoSession) = { - val labelMeta = findById(id) - val (labelId, name) = (labelMeta.labelId, labelMeta.name) - sql"""delete from label_metas where id = ${id}""".execute.apply() - val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:name=$name") - cacheKeys.foreach { key => - expireCache(key) - expireCaches(key) - } - } - - def findAll()(implicit session: DBSession = AutoSession) = { - val ls = sql"""select * from label_metas""".map { rs => LabelMeta(rs) }.list.apply - putsToCache(ls.map { x => - val cacheKey = s"id=${x.id.get}" - cacheKey -> x - }) - putsToCache(ls.map { x => - val cacheKey = s"labelId=${x.labelId}:name=${x.name}" - cacheKey -> x - }) - putsToCache(ls.map { x => - val cacheKey = s"labelId=${x.labelId}:seq=${x.seq}" - cacheKey -> x - }) - - putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) => - val cacheKey = s"labelId=${labelId}" - cacheKey -> ls - }.toList) - - ls - } - - def updateStoreInGlobalIndex(id: Int, storeInGlobalIndex: Boolean)(implicit session: DBSession = AutoSession): Try[Long] = Try { - sql""" - update label_metas set store_in_global_index = ${storeInGlobalIndex} where id = ${id} - """.updateAndReturnGeneratedKey.apply() - } -} - -case class LabelMeta(id: Option[Int], - labelId: Int, - name: String, - seq: Byte, - defaultValue: String, - dataType: String, - storeInGlobalIndex: Boolean = false) { - - lazy val toJson = Json.obj("name" -> name, "defaultValue" -> defaultValue, "dataType" -> dataType, "storeInGlobalIndex" -> storeInGlobalIndex) - - override def equals(other: Any): Boolean = { - if (!other.isInstanceOf[LabelMeta]) false - else { - val o = other.asInstanceOf[LabelMeta] -// labelId == o.labelId && - seq == o.seq - } - } - override def hashCode(): Int = seq.toInt -// (labelId, seq).hashCode() -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala deleted file mode 100644 index e21072e..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.mysqls - -import java.util.concurrent.Executors -import java.util.concurrent.atomic.AtomicLong - -import com.typesafe.config.{Config, ConfigFactory} -import org.apache.s2graph.core.JSONParser -import org.apache.s2graph.core.utils.{SafeUpdateCache, logger} -import play.api.libs.json.{JsObject, JsValue, Json} -import scalikejdbc._ - -import scala.concurrent.ExecutionContext -import scala.io.Source -import scala.language.{higherKinds, implicitConversions} -import scala.util.{Failure, Success, Try} - -object Model { - var maxSize = 10000 - var ttl = 60 - val numOfThread = Runtime.getRuntime.availableProcessors() - val threadPool = Executors.newFixedThreadPool(numOfThread) - val ec = ExecutionContext.fromExecutor(threadPool) - val useUTF8Encoding = "?useUnicode=true&characterEncoding=utf8" - - private val ModelReferenceCount = new AtomicLong(0L) - - def apply(config: Config) = { - maxSize = config.getInt("cache.max.size") - ttl = config.getInt("cache.ttl.seconds") - Class.forName(config.getString("db.default.driver")) - - val settings = ConnectionPoolSettings( - initialSize = 10, - maxSize = 10, - connectionTimeoutMillis = 30000L, - validationQuery = "select 1;") - - ConnectionPool.singleton( - config.getString("db.default.url"), - config.getString("db.default.user"), - config.getString("db.default.password"), - settings) - - checkSchema() - - ModelReferenceCount.incrementAndGet() - } - - def checkSchema(): Unit = { - withTx { implicit session => - sql"""show tables""".map(rs => rs.string(1)).list.apply() - } match { - case Success(tables) => - if (tables.isEmpty) { - // this is a very simple migration tool that only supports creating - // appropriate tables when there are no tables in the database at all. - // Ideally, it should be improved to a sophisticated migration tool - // that supports versioning, etc. - logger.info("Creating tables ...") - val schema = getClass.getResourceAsStream("schema.sql") - val lines = Source.fromInputStream(schema, "UTF-8").getLines - val sources = lines.map(_.split("-- ").head.trim).mkString("\n") - val statements = sources.split(";\n") - withTx { implicit session => - statements.foreach(sql => session.execute(sql)) - } match { - case Success(_) => - logger.info("Successfully imported schema") - case Failure(e) => - throw new RuntimeException("Error while importing schema", e) - } - } - case Failure(e) => - throw new RuntimeException("Could not list tables in the database", e) - } - } - - def withTx[T](block: DBSession => T): Try[T] = { - using(DB(ConnectionPool.borrow())) { conn => - Try { - conn.begin() - val session = conn.withinTxSession() - val result = block(session) - - conn.commit() - - result - } recoverWith { - case e: Exception => - conn.rollbackIfActive() - Failure(e) - } - } - } - - def shutdown(modelDataDelete: Boolean = false) = - if (ModelReferenceCount.decrementAndGet() <= 0) { - // FIXME: When Model is served by embedded database and deleteData is set, Model deletes - // the underlying database. Its purpose is clearing runtime footprint when running tests. - if (modelDataDelete) { - withTx { implicit session => - sql"SHOW TABLES" - .map(rs => rs.string(1)) - .list - .apply() - .map { table => s"TRUNCATE TABLE $table" } - } match { - case Success(stmts) => - val newStmts = List("SET FOREIGN_KEY_CHECKS = 0") ++ stmts ++ List("SET FOREIGN_KEY_CHECKS = 1") - withTx { implicit session => - newStmts.foreach { stmt => - session.execute(stmt) - } - } match { - case Success(_) => - logger.info(s"Success to truncate models: $stmts") - case Failure(e) => - throw new IllegalStateException(s"Failed to truncate models", e) - } - case Failure(e) => - throw new IllegalStateException(s"Failed to list models", e) - } - } - clearCache() - ConnectionPool.closeAll() - } - - def loadCache() = { - Service.findAll() - ServiceColumn.findAll() - Label.findAll() - LabelMeta.findAll() - LabelIndex.findAll() - ColumnMeta.findAll() - } - - def clearCache() = { - Service.expireAll() - ServiceColumn.expireAll() - Label.expireAll() - LabelMeta.expireAll() - LabelIndex.expireAll() - ColumnMeta.expireAll() - } - - def extraOptions(options: Option[String]): Map[String, JsValue] = options match { - case None => Map.empty - case Some(v) => - try { - Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty) - } catch { - case e: Exception => - logger.error(s"An error occurs while parsing the extra label option", e) - Map.empty - } - } - - def toStorageConfig(options: Map[String, JsValue]): Option[Config] = { - try { - options.get("storage").map { jsValue => - import scala.collection.JavaConverters._ - val configMap = jsValue.as[JsObject].fieldSet.toMap.map { case (key, value) => - key -> JSONParser.jsValueToAny(value).getOrElse(throw new RuntimeException("!!")) - } - ConfigFactory.parseMap(configMap.asJava) - } - } catch { - case e: Exception => - logger.error(s"toStorageConfig error. use default storage", e) - None - } - } -} - -trait Model[V] extends SQLSyntaxSupport[V] { - - import Model._ - - implicit val ec: ExecutionContext = Model.ec - - val cName = this.getClass.getSimpleName() - logger.info(s"LocalCache[$cName]: TTL[$ttl], MaxSize[$maxSize]") - - val optionCache = new SafeUpdateCache[Option[V]](cName, maxSize, ttl) - val listCache = new SafeUpdateCache[List[V]](cName, maxSize, ttl) - - val withCache = optionCache.withCache _ - - val withCaches = listCache.withCache _ - - val expireCache = optionCache.invalidate _ - - val expireCaches = listCache.invalidate _ - - def expireAll() = { - listCache.invalidateAll() - optionCache.invalidateAll() - } - - def putsToCache(kvs: List[(String, V)]) = kvs.foreach { - case (key, value) => optionCache.put(key, Option(value)) - } - - def putsToCaches(kvs: List[(String, List[V])]) = kvs.foreach { - case (key, values) => listCache.put(key, values) - } - - def getAllCacheData() : (List[(String, Option[_])], List[(String, List[_])]) = { - (optionCache.getAllData(), listCache.getAllData()) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala deleted file mode 100644 index 5b4f494..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.mysqls - -import java.util.UUID - -import com.typesafe.config.Config -import org.apache.s2graph.core.utils.logger -import play.api.libs.json.Json -import scalikejdbc._ - -object Service extends Model[Service] { - def valueOf(rs: WrappedResultSet): Service = { - Service(rs.intOpt("id"), rs.string("service_name"), rs.string("access_token"), - rs.string("cluster"), rs.string("hbase_table_name"), rs.int("pre_split_size"), rs.intOpt("hbase_table_ttl")) - } - - def findByAccessToken(accessToken: String)(implicit session: DBSession = AutoSession): Option[Service] = { - val cacheKey = s"accessToken=$accessToken" - withCache(cacheKey)( sql"""select * from services where access_token = ${accessToken}""".map { rs => Service.valueOf(rs) }.single.apply) - } - - def findById(id: Int)(implicit session: DBSession = AutoSession): Service = { - val cacheKey = "id=" + id - withCache(cacheKey)( sql"""select * from services where id = ${id}""".map { rs => Service.valueOf(rs) }.single.apply).get - } - - def findByName(serviceName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Service] = { - val cacheKey = "serviceName=" + serviceName - lazy val serviceOpt = sql""" - select * from services where service_name = ${serviceName} - """.map { rs => Service.valueOf(rs) }.single.apply() - - if (useCache) withCache(cacheKey)(serviceOpt) - else serviceOpt - } - - def insert(serviceName: String, cluster: String, - hTableName: String, preSplitSize: Int, hTableTTL: Option[Int], - compressionAlgorithm: String)(implicit session: DBSession = AutoSession): Unit = { - logger.info(s"$serviceName, $cluster, $hTableName, $preSplitSize, $hTableTTL, $compressionAlgorithm") - val accessToken = UUID.randomUUID().toString() - sql"""insert into services(service_name, access_token, cluster, hbase_table_name, pre_split_size, hbase_table_ttl) - values(${serviceName}, ${accessToken}, ${cluster}, ${hTableName}, ${preSplitSize}, ${hTableTTL})""".execute.apply() - } - - def delete(id: Int)(implicit session: DBSession = AutoSession) = { - val service = findById(id) - val serviceName = service.serviceName - sql"""delete from service_columns where id = ${id}""".execute.apply() - val cacheKeys = List(s"id=$id", s"serviceName=$serviceName") - cacheKeys.foreach { key => - expireCache(key) - expireCaches(key) - } - } - - def findOrInsert(serviceName: String, cluster: String, hTableName: String, - preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Service = { - findByName(serviceName, useCache) match { - case Some(s) => s - case None => - insert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm) - val cacheKey = "serviceName=" + serviceName - expireCache(cacheKey) - findByName(serviceName).get - } - } - - def findAll()(implicit session: DBSession = AutoSession) = { - val ls = sql"""select * from services""".map { rs => Service.valueOf(rs) }.list.apply - putsToCache(ls.map { x => - val cacheKey = s"id=${x.id.get}" - (cacheKey -> x) - }) - - putsToCache(ls.map { x => - val cacheKey = s"serviceName=${x.serviceName}" - (cacheKey -> x) - }) - - ls - } - - def findAllConn()(implicit session: DBSession = AutoSession): List[String] = { - sql"""select distinct(cluster) from services""".map { rs => rs.string("cluster") }.list.apply - } -} - -case class Service(id: Option[Int], - serviceName: String, - accessToken: String, - cluster: String, - hTableName: String, - preSplitSize: Int, - hTableTTL: Option[Int], - options: Option[String] = None) { - lazy val toJson = - id match { - case Some(_id) => - Json.obj("id" -> _id, "name" -> serviceName, "accessToken" -> accessToken, "cluster" -> cluster, - "hTableName" -> hTableName, "preSplitSize" -> preSplitSize, "hTableTTL" -> hTableTTL) - case None => - Json.parse("{}") - } - - lazy val extraOptions = Model.extraOptions(options) - lazy val storageConfigOpt: Option[Config] = toStorageConfig - def serviceColumns(useCache: Boolean): Seq[ServiceColumn] = ServiceColumn.findByServiceId(id.get, useCache = useCache) - def toStorageConfig: Option[Config] = Model.toStorageConfig(extraOptions) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala deleted file mode 100644 index e8bec06..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.mysqls - -import org.apache.s2graph.core.JSONParser -import org.apache.s2graph.core.JSONParser._ -import org.apache.s2graph.core.types.{HBaseType, InnerValLike, InnerValLikeWithTs} -import play.api.libs.json.Json -import scalikejdbc._ - -object ServiceColumn extends Model[ServiceColumn] { - val Default = ServiceColumn(Option(0), -1, "default", "string", "v4") - - def valueOf(rs: WrappedResultSet): ServiceColumn = { - ServiceColumn(rs.intOpt("id"), rs.int("service_id"), rs.string("column_name"), rs.string("column_type").toLowerCase(), rs.string("schema_version")) - } - - def findByServiceId(serviceId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Seq[ServiceColumn] = { - val cacheKey = "serviceId=" + serviceId - - lazy val sql = sql"""select * from service_columns where service_id = ${serviceId}""".map { x => ServiceColumn.valueOf(x) }.list().apply() - - if (useCache) withCaches(cacheKey)(sql) - else sql - } - - def findById(id: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = { - val cacheKey = "id=" + id - - if (useCache) { - withCache(cacheKey)(sql"""select * from service_columns where id = ${id}""".map { x => ServiceColumn.valueOf(x) }.single.apply).get - } else { - sql"""select * from service_columns where id = ${id}""".map { x => ServiceColumn.valueOf(x) }.single.apply.get - } - } - - def find(serviceId: Int, columnName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[ServiceColumn] = { -// val cacheKey = s"serviceId=$serviceId:columnName=$columnName" - val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName - if (useCache) { - withCache(cacheKey) { - sql""" - select * from service_columns where service_id = ${serviceId} and column_name = ${columnName} - """.map { rs => ServiceColumn.valueOf(rs) }.single.apply() - } - } else { - sql""" - select * from service_columns where service_id = ${serviceId} and column_name = ${columnName} - """.map { rs => ServiceColumn.valueOf(rs) }.single.apply() - } - } - def insert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String)(implicit session: DBSession = AutoSession) = { - sql"""insert into service_columns(service_id, column_name, column_type, schema_version) - values(${serviceId}, ${columnName}, ${columnType}, ${schemaVersion})""".execute.apply() - } - def delete(id: Int)(implicit session: DBSession = AutoSession) = { - val serviceColumn = findById(id, useCache = false) - val (serviceId, columnName) = (serviceColumn.serviceId, serviceColumn.columnName) - sql"""delete from service_columns where id = ${id}""".execute.apply() - val cacheKeys = List(s"id=$id", s"serviceId=$serviceId:columnName=$columnName") - cacheKeys.foreach { key => - expireCache(key) - expireCaches(key) - } - } - def findOrInsert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String = HBaseType.DEFAULT_VERSION, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = { - find(serviceId, columnName, useCache) match { - case Some(sc) => sc - case None => - insert(serviceId, columnName, columnType, schemaVersion) -// val cacheKey = s"serviceId=$serviceId:columnName=$columnName" - val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName - expireCache(cacheKey) - find(serviceId, columnName).get - } - } - def findAll()(implicit session: DBSession = AutoSession) = { - val ls = sql"""select * from service_columns""".map { rs => ServiceColumn.valueOf(rs) }.list.apply - putsToCache(ls.map { x => - var cacheKey = s"id=${x.id.get}" - (cacheKey -> x) - }) - putsToCache(ls.map { x => - var cacheKey = s"serviceId=${x.serviceId}:columnName=${x.columnName}" - (cacheKey -> x) - }) - ls - } -} -case class ServiceColumn(id: Option[Int], - serviceId: Int, - columnName: String, - columnType: String, - schemaVersion: String) { - - lazy val service = Service.findById(serviceId) - lazy val metasWithoutCache = ColumnMeta.timestamp +: ColumnMeta.findAllByColumn(id.get, false) :+ ColumnMeta.lastModifiedAtColumn - lazy val metas = ColumnMeta.timestamp +: ColumnMeta.findAllByColumn(id.get) :+ ColumnMeta.lastModifiedAtColumn - lazy val metasMap = metas.map { meta => meta.seq.toInt -> meta } toMap - lazy val metasInvMap = metas.map { meta => meta.name -> meta} toMap - lazy val metaNamesMap = (ColumnMeta.lastModifiedAtColumn :: metas).map(x => (x.seq.toInt, x.name)) toMap - lazy val metaPropsDefaultMap = metas.map { meta => - meta -> JSONParser.toInnerVal(meta.defaultValue, meta.dataType, schemaVersion) - }.toMap - lazy val toJson = Json.obj("serviceName" -> service.serviceName, "columnName" -> columnName, "columnType" -> columnType) - - def propsToInnerVals(props: Map[String, Any]): Map[ColumnMeta, InnerValLike] = { - val ret = for { - (k, v) <- props - labelMeta <- metasInvMap.get(k) - innerVal = toInnerVal(v, labelMeta.dataType, schemaVersion) - } yield labelMeta -> innerVal - - ret - } - - def innerValsToProps(props: Map[Int, InnerValLike]): Map[String, Any] = { - for { - (k, v) <- props - columnMeta <- metasMap.get(k) - } yield { - columnMeta.name -> v.value - } - } - - def innerValsWithTsToProps(props: Map[Int, InnerValLikeWithTs]): Map[String, Any] = { - for { - (k, v) <- props - columnMeta <- metasMap.get(k) - } yield { - columnMeta.name -> v.innerVal.value - } - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumnIndex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumnIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumnIndex.scala deleted file mode 100644 index 38e1761..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumnIndex.scala +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.mysqls - -import scalikejdbc.{AutoSession, DBSession, WrappedResultSet, _} - -object ServiceColumnIndex extends Model[ServiceColumnIndex] { - val dbTableName = "service_column_indices" - val DefaultName = "_PK" - val DefaultSeq = 1.toByte - val MaxOrderSeq = 7 - - def apply(rs: WrappedResultSet): ServiceColumnIndex = { - ServiceColumnIndex(rs.intOpt("id"), rs.int("service_id"), rs.int("service_column_id"), - rs.string("name"), - rs.byte("seq"), rs.string("meta_seqs").split(",").filter(_ != "").map(s => s.toByte).toList match { - case metaSeqsList => metaSeqsList - }, - rs.stringOpt("options") - ) - } - - def findById(id: Int)(implicit session: DBSession = AutoSession) = { - val cacheKey = "id=" + id - lazy val sql = sql"""select * from $dbTableName where id = ${id}""" - withCache(cacheKey) { - sql.map { rs => ServiceColumnIndex(rs) }.single.apply - }.get - } - - def findBySeqs(serviceId: Int, serviceColumnId: Int, seqs: List[Byte])(implicit session: DBSession = AutoSession): Option[ServiceColumnIndex] = { - val cacheKey = "serviceId=" + serviceId + ":serviceColumnId=" + serviceColumnId + ":seqs=" + seqs.mkString(",") - lazy val sql = - sql""" - select * from $dbTableName where service_id = $serviceId and service_column_id = $serviceColumnId and meta_seqs = ${seqs.mkString(",")} - """ - withCache(cacheKey) { - sql.map { rs => ServiceColumnIndex(rs) }.single.apply - } - } - - def findBySeq(serviceId: Int, - serviceColumnId: Int, - seq: Byte, - useCache: Boolean = true)(implicit session: DBSession = AutoSession) = { - val cacheKey = "serviceId=" + serviceId + ":serviceColumnId=" + serviceColumnId + ":seq=" + seq - lazy val sql = - sql""" - select * from $dbTableName where service_id = $serviceId and service_column_id = $serviceColumnId and seq = ${seq} - """ - if (useCache) { - withCache(cacheKey)(sql.map { rs => ServiceColumnIndex(rs) }.single.apply) - } else { - sql.map { rs => ServiceColumnIndex(rs) }.single.apply - } - } - - - def findAll(serviceId: Int, serviceColumnId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = { - val cacheKey = s"serviceId=$serviceId:serviceColumnId=$serviceColumnId" - lazy val sql = - sql""" - select * from $dbTableName where service_id = ${serviceId} and seq > 0 order by seq ASC - """ - if (useCache) { - withCaches(cacheKey)( - sql.map { rs => ServiceColumnIndex(rs) }.list.apply - ) - } else { - sql.map { rs => LabelIndex(rs) }.list.apply - } - } - - def insert(serviceId: Int, - serviceColumnId: Int, - indexName: String, - seq: Byte, metaSeqs: List[Byte], options: Option[String])(implicit session: DBSession = AutoSession): Long = { - sql""" - insert into $dbTableName(service_id, service_column_id, name, seq, meta_seqs, options) - values (${serviceId}, ${serviceColumnId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${options}) - """ - .updateAndReturnGeneratedKey.apply() - } - - def findOrInsert(serviceId: Int, - serviceColumnId: Int, - indexName: String, - metaSeqs: List[Byte], - options: Option[String])(implicit session: DBSession = AutoSession): ServiceColumnIndex = { - findBySeqs(serviceId, serviceColumnId, metaSeqs) match { - case Some(s) => s - case None => - val orders = findAll(serviceId, serviceColumnId, false) - val seq = (orders.size + 1).toByte - assert(seq <= MaxOrderSeq) - val createdId = insert(serviceId, serviceColumnId, indexName, seq, metaSeqs, options) - val cacheKeys = toCacheKeys(createdId.toInt, serviceId, serviceColumnId, seq, metaSeqs) - - cacheKeys.foreach { key => - expireCache(key) - expireCaches(key) - } - findBySeq(serviceId, serviceColumnId, seq).get - } - } - - def toCacheKeys(id: Int, serviceId: Int, serviceColumnId: Int, seq: Byte, seqs: Seq[Byte]): Seq[String] = { - Seq(s"id=$id", - s"serviceId=$serviceId:serviceColumnId=$serviceColumnId:seq=$seq", - s"serviceId=$serviceId:serviceColumnId=$serviceColumnId:seqs=$seqs", - s"serviceId=$serviceId:serviceColumnId=$serviceColumnId") - } - - def delete(id: Int)(implicit session: DBSession = AutoSession) = { - val me = findById(id) - val seqs = me.metaSeqs.mkString(",") - val (serviceId, serviceColumnId, seq) = (me.serviceId, me.serviceColumnId, me.seq) - lazy val sql = sql"""delete from $dbTableName where id = ${id}""" - - sql.execute.apply() - - val cacheKeys = toCacheKeys(id, serviceId, serviceColumnId, seq, me.metaSeqs) - - cacheKeys.foreach { key => - expireCache(key) - expireCaches(key) - } - } - -// def findAll()(implicit session: DBSession = AutoSession) = { -// val ls = sql"""select * from $dbTableName""".map { rs => ServiceColumnIndex(rs) }.list.apply -// val singles = ls.flatMap { x => -// val cacheKeys = toCacheKeys(x.id.get, x.serviceId, x.serviceColumnId, x.seq, x.metaSeqs).dropRight(1) -// cacheKeys.map { cacheKey => -// cacheKey -> x -// } -// } -// val multies = ls.groupBy(x => (x.serviceId, x.serviceColumnId)).map { case ((serviceId, serviceColumnId), ls) => -// val cacheKey = s"serviceId=$serviceId:serviceColumnId=$serviceColumnId" -// cacheKey -> ls -// }.toList -// -// putsToCache(singles) -// putsToCaches(multies) -// -// } -} - -case class ServiceColumnIndex(id: Option[Int], - serviceId: Int, - serviceColumnId: Int, - name: String, - seq: Byte, - metaSeqs: Seq[Byte], - options: Option[String]) { - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala index 00ef233..75e9657 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core.parsers import org.apache.s2graph.core.GraphExceptions.{LabelNotExistException, WhereParserException} -import org.apache.s2graph.core.mysqls.{Label, LabelMeta} +import org.apache.s2graph.core.schema.{Label, LabelMeta} import org.apache.s2graph.core.types.InnerValLike import org.apache.s2graph.core.{S2EdgeLike} import org.apache.s2graph.core.JSONParser._ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala index 6b1ce75..f59abc0 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala @@ -26,7 +26,7 @@ import com.google.common.cache.CacheBuilder import org.apache.s2graph.core.GraphExceptions.{BadQueryException, ModelNotFoundException} import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core._ -import org.apache.s2graph.core.mysqls._ +import org.apache.s2graph.core.schema._ import org.apache.s2graph.core.parsers.{Where, WhereParser} import org.apache.s2graph.core.types._ import play.api.libs.json._ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala index 460c627..c768d81 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala @@ -24,7 +24,7 @@ import java.net.URL import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException} import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core._ -import org.apache.s2graph.core.mysqls.{Bucket, Experiment, Service} +import org.apache.s2graph.core.schema.{Bucket, Experiment, Service} import org.apache.s2graph.core.utils.logger import play.api.libs.json._ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/Bucket.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Bucket.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Bucket.scala new file mode 100644 index 0000000..c88f854 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Bucket.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.schema + +import play.api.libs.json.{JsValue, Json} +import scalikejdbc._ + +import scala.util.Try + +object Bucket extends SQLSyntaxSupport[Bucket] { + import Schema._ + val className = Bucket.getClass.getSimpleName + + val rangeDelimiter = "~" + val INVALID_BUCKET_EXCEPTION = new RuntimeException("invalid bucket.") + val InActiveModulars = Set("0~0") + + def valueOf(rs: WrappedResultSet): Bucket = { + Bucket(rs.intOpt("id"), + rs.int("experiment_id"), + rs.string("modular"), + rs.string("http_verb"), + rs.string("api_path"), + rs.string("request_body"), + rs.int("timeout"), + rs.string("impression_id"), + rs.boolean("is_graph_query"), + rs.boolean("is_empty")) + } + + def finds(experimentId: Int)(implicit session: DBSession = AutoSession): List[Bucket] = { + val cacheKey = className + "experimentId=" + experimentId + + withCaches(cacheKey, broadcast = false) { + sql"""select * from buckets where experiment_id = $experimentId""" + .map { rs => Bucket.valueOf(rs) }.list().apply() + } + } + + def toRange(str: String): Option[(Int, Int)] = { + val range = str.split(rangeDelimiter) + if (range.length == 2) Option((range.head.toInt, range.last.toInt)) + else None + } + + def findByImpressionId(impressionId: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Bucket] = { + val cacheKey = className + "impressionId=" + impressionId + + lazy val sql = sql"""select * from buckets where impression_id=$impressionId""" + .map { rs => Bucket.valueOf(rs)}.single().apply() + + if (useCache) withCache(cacheKey)(sql) + else sql + + } + + def findById(id: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Bucket = { + val cacheKey = className + "id=" + id + lazy val sql = sql"""select * from buckets where id = $id""".map { rs => Bucket.valueOf(rs)}.single().apply() + if (useCache) withCache(cacheKey, false) { sql }.get + else sql.get + } + + def update(id: Int, + experimentId: Int, + modular: String, + httpVerb: String, + apiPath: String, + requestBody: String, + timeout: Int, + impressionId: String, + isGraphQuery: Boolean, + isEmpty: Boolean)(implicit session: DBSession = AutoSession): Try[Bucket] = { + Try { + sql""" + UPDATE buckets set experiment_id = $experimentId, modular = $modular, http_verb = $httpVerb, api_path = $apiPath, + request_body = $requestBody, timeout = $timeout, impression_id = $impressionId, + is_graph_query = $isGraphQuery, is_empty = $isEmpty WHERE id = $id + """ + .update().apply() + }.map { cnt => + findById(id) + } + } + + def insert(experimentId: Int, modular: String, httpVerb: String, apiPath: String, + requestBody: String, timeout: Int, impressionId: String, + isGraphQuery: Boolean, isEmpty: Boolean) + (implicit session: DBSession = AutoSession): Try[Bucket] = { + Try { + sql""" + INSERT INTO buckets(experiment_id, modular, http_verb, api_path, request_body, timeout, impression_id, + is_graph_query, is_empty) + VALUES (${experimentId}, $modular, $httpVerb, $apiPath, $requestBody, $timeout, $impressionId, + $isGraphQuery, $isEmpty) + """ + .updateAndReturnGeneratedKey().apply() + }.map { newId => + Bucket(Some(newId.toInt), experimentId, modular, httpVerb, apiPath, requestBody, timeout, impressionId, + isGraphQuery, isEmpty) + } + } +} + +case class Bucket(id: Option[Int], + experimentId: Int, + modular: String, + httpVerb: String, apiPath: String, + requestBody: String, timeout: Int, impressionId: String, + isGraphQuery: Boolean = true, + isEmpty: Boolean = false) { + + import Bucket._ + + lazy val rangeOpt = toRange(modular) + + def toJson(): JsValue = + Json.obj("id" -> id, "experimentId" -> experimentId, "modular" -> modular, "httpVerb" -> httpVerb, + "requestBody" -> requestBody, "isGraphQuery" -> isGraphQuery, "isEmpty" -> isEmpty) + +}
