Repository: incubator-s2graph Updated Branches: refs/heads/master f0f1081b1 -> 32eb344a7
naive implementation Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/3ddea3f7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/3ddea3f7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/3ddea3f7 Branch: refs/heads/master Commit: 3ddea3f78acad63e52ef855642390cb751d172b8 Parents: 33e3d26 Author: daewon <[email protected]> Authored: Wed May 23 19:33:42 2018 +0900 Committer: daewon <[email protected]> Committed: Thu May 24 13:33:05 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/core/EdgeFetcher.scala | 2 +- .../org/apache/s2graph/core/EdgeMutator.scala | 2 +- .../org/apache/s2graph/core/Management.scala | 93 +++--- .../apache/s2graph/core/ResourceManager.scala | 59 +++- .../scala/org/apache/s2graph/core/S2Graph.scala | 6 +- .../org/apache/s2graph/core/VertexFetcher.scala | 2 +- .../org/apache/s2graph/core/VertexMutator.scala | 2 +- .../core/storage/jdbc/JdbcEdgeFetcher.scala | 80 ++++++ .../core/storage/jdbc/JdbcEdgeMutator.scala | 87 ++++++ .../s2graph/core/storage/jdbc/JdbcStorage.scala | 177 ++++++++++++ .../apache/s2graph/core/utils/Importer.scala | 3 +- .../s2graph/core/utils/SafeUpdateCache.scala | 21 +- .../s2graph/core/fetcher/BaseFetcherTest.scala | 17 +- .../core/storage/jdbc/JdbcStorageTest.scala | 281 +++++++++++++++++++ .../org/apache/s2graph/graphql/TestGraph.scala | 7 +- 15 files changed, 767 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala index c3760e0..82148c6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala @@ -24,7 +24,7 @@ import org.apache.s2graph.core.types.VertexId import scala.concurrent.{ExecutionContext, Future} -trait EdgeFetcher { +trait EdgeFetcher extends AutoCloseable { def init(config: Config)(implicit ec: ExecutionContext): Unit = {} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala index 252d129..db8f142 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala @@ -24,7 +24,7 @@ import org.apache.s2graph.core.storage.MutateResponse import scala.concurrent.{ExecutionContext, Future} -trait EdgeMutator { +trait EdgeMutator extends AutoCloseable { def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index c41e890..0c41ee3 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -36,10 +36,11 @@ import scala.concurrent.{ExecutionContext, Future} import scala.util.Try /** - * This is designed to be bridge between rest to s2core. - * s2core never use this for finding models. - */ + * This is designed to be bridge between rest to s2core. + * s2core never use this for finding models. + */ object Management { + import HBaseType._ import scala.collection.JavaConversions._ @@ -196,8 +197,8 @@ object Management { service <- Service.findByName(serviceName, useCache = false) serviceColumn <- ServiceColumn.find(service.id.get, columnName) } yield { - ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType, defaultValue, storeInGlobalIndex) - } + ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType, defaultValue, storeInGlobalIndex) + } result.getOrElse({ throw new RuntimeException(s"add property on vertex failed") }) @@ -231,11 +232,11 @@ object Management { (k, v) <- js.fields meta <- column.metasInvMap.get(k) } yield { - val innerVal = jsValueToInnerVal(v, meta.dataType, column.schemaVersion).getOrElse( - throw new RuntimeException(s"$k is not defined. create schema for vertex.")) + val innerVal = jsValueToInnerVal(v, meta.dataType, column.schemaVersion).getOrElse( + throw new RuntimeException(s"$k is not defined. create schema for vertex.")) - (meta.seq.toInt, innerVal) - } + (meta.seq.toInt, innerVal) + } props } @@ -279,20 +280,21 @@ object Management { Label.updateName(tempLabel, rightLabel) } } + def toConfig(params: Map[String, Any]): Config = { import scala.collection.JavaConversions._ val filtered = params.filter { case (k, v) => - v match { - case None => false - case _ => true - } + v match { + case None => false + case _ => true + } }.map { case (k, v) => - val newV = v match { - case Some(value) => value - case _ => v - } - k -> newV + val newV = v match { + case Some(value) => value + case _ => v + } + k -> newV } ConfigFactory.parseMap(filtered) @@ -353,13 +355,13 @@ class Management(graph: S2GraphLike) { } def createStorageTable(zkAddr: String, - tableName: String, - cfs: List[String], - regionMultiplier: Int, - ttl: Option[Int], - compressionAlgorithm: String = DefaultCompressionAlgorithm, - replicationScopeOpt: Option[Int] = None, - totalRegionCount: Option[Int] = None): Unit = { + tableName: String, + cfs: List[String], + regionMultiplier: Int, + ttl: Option[Int], + compressionAlgorithm: String = DefaultCompressionAlgorithm, + replicationScopeOpt: Option[Int] = None, + totalRegionCount: Option[Int] = None): Unit = { val config = toConfig(Map( ZookeeperQuorum -> zkAddr, // ColumnFamilies -> cfs, @@ -374,11 +376,11 @@ class Management(graph: S2GraphLike) { /** HBase specific code */ def createService(serviceName: String, - cluster: String, - hTableName: String, - preSplitSize: Int, - hTableTTL: Int, - compressionAlgorithm: String): Service = { + cluster: String, + hTableName: String, + preSplitSize: Int, + hTableTTL: Int, + compressionAlgorithm: String): Service = { createService(serviceName, cluster, hTableName, preSplitSize, Option(hTableTTL).filter(_ > -1), compressionAlgorithm).get } @@ -439,6 +441,7 @@ class Management(graph: S2GraphLike) { serviceColumnTry.get } + def createLabel(labelName: String, srcColumn: ServiceColumn, tgtColumn: ServiceColumn, @@ -451,7 +454,8 @@ class Management(graph: S2GraphLike) { hTableTTL: Int, schemaVersion: String, compressionAlgorithm: String, - options: String): Label = { + options: String + ): Label = { import scala.collection.JavaConversions._ createLabel(labelName, @@ -481,13 +485,15 @@ class Management(graph: S2GraphLike) { schemaVersion: String = DEFAULT_VERSION, isAsync: Boolean = false, compressionAlgorithm: String = "gz", - options: Option[String] = None): Try[Label] = { + options: Option[String] = None, + initFetcherWithOptions: Boolean = false + ): Try[Label] = { - if (label.length > LABEL_NAME_MAX_LENGTH ) throw new LabelNameTooLongException(s"Label name ${label} too long.( max length : ${LABEL_NAME_MAX_LENGTH}} )") + if (label.length > LABEL_NAME_MAX_LENGTH) throw new LabelNameTooLongException(s"Label name ${label} too long.( max length : ${LABEL_NAME_MAX_LENGTH}} )") if (hTableName.isEmpty && hTableTTL.isDefined) throw new RuntimeException("if want to specify ttl, give hbaseTableName also") val labelOpt = Label.findByName(label, useCache = false) - Schema withTx { implicit session => + val newLabelTry = Schema withTx { implicit session => if (labelOpt.isDefined) throw new LabelAlreadyExistException(s"Label name ${label} already exist.") /* create all models */ @@ -509,20 +515,27 @@ class Management(graph: S2GraphLike) { )) storage.createTable(config, newLabel.hbaseTableName) - updateEdgeFetcher(newLabel, None) - updateEdgeFetcher(newLabel, None) - newLabel } + + newLabelTry.foreach { newLabel => + if (initFetcherWithOptions) { + updateEdgeFetcher(newLabel, options) + } else { + updateEdgeFetcher(newLabel, None) + } + } + + newLabelTry } /** * label */ /** - * copy label when if oldLabel exist and newLabel do not exist. - * copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster. - */ + * copy label when if oldLabel exist and newLabel do not exist. + * copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster. + */ def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]): Try[Label] = { val old = Label.findByName(oldLabelName, useCache = false).getOrElse(throw new LabelNotExistException(s"Old label $oldLabelName not exists.")) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala index a8a3a34..423b3c9 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala @@ -19,10 +19,15 @@ package org.apache.s2graph.core -import com.typesafe.config.{Config, ConfigFactory} +import com.typesafe.config.impl.ConfigImpl +import com.typesafe.config._ import org.apache.s2graph.core.schema.{Label, ServiceColumn} -import org.apache.s2graph.core.utils.SafeUpdateCache +import org.apache.s2graph.core.utils.{SafeUpdateCache, logger} + import scala.concurrent.ExecutionContext +import scala.reflect.ClassTag +import scala.tools.nsc.typechecker.StructuredTypeStrings +import scala.util.Try object ResourceManager { @@ -32,14 +37,19 @@ object ResourceManager { import scala.collection.JavaConverters._ val ClassNameKey = "className" - val EdgeFetcherKey = classOf[EdgeFetcher].getClass().getName + val MaxSizeKey = "resource.cache.max.size" + val CacheTTL = "resource.cache.ttl.seconds" + + val EdgeFetcherKey = classOf[EdgeFetcher].getName - val VertexFetcherKey = classOf[VertexFetcher].getClass().getName + val VertexFetcherKey = classOf[VertexFetcher].getName - val EdgeMutatorKey = classOf[EdgeMutator].getClass.getName - val VertexMutatorKey = classOf[VertexMutator].getClass.getName + val EdgeMutatorKey = classOf[EdgeMutator].getName + val VertexMutatorKey = classOf[VertexMutator].getName - val DefaultConfig = ConfigFactory.parseMap(Map(MaxSizeKey -> 1000, TtlKey -> -1).asJava) + val DefaultMaxSize = 1000 + val DefaultCacheTTL = -1 + val DefaultConfig = ConfigFactory.parseMap(Map(MaxSizeKey -> DefaultMaxSize, TtlKey -> DefaultCacheTTL).asJava) } class ResourceManager(graph: S2GraphLike, @@ -49,7 +59,10 @@ class ResourceManager(graph: S2GraphLike, import scala.collection.JavaConverters._ - val cache = new SafeUpdateCache(_config) + val maxSize = Try(_config.getInt(ResourceManager.MaxSizeKey)).getOrElse(DefaultMaxSize) + val cacheTTL = Try(_config.getInt(ResourceManager.CacheTTL)).getOrElse(DefaultCacheTTL) + + val cache = new SafeUpdateCache(maxSize, cacheTTL) def getAllVertexFetchers(): Seq[VertexFetcher] = { cache.asMap().asScala.toSeq.collect { case (_, (obj: VertexFetcher, _, _)) => obj } @@ -59,10 +72,22 @@ class ResourceManager(graph: S2GraphLike, cache.asMap().asScala.toSeq.collect { case (_, (obj: EdgeFetcher, _, _)) => obj } } + def onEvict(oldValue: AnyRef): Unit = { + oldValue match { + case o: Option[_] => o.foreach { + case v: AutoCloseable => v.close() + } + case v: AutoCloseable => v.close() + case _ => logger.info(s"Class does't have close() method ${oldValue.getClass.getName}") + } + + logger.info(s"[${oldValue.getClass.getName}]: $oldValue evicted.") + } + def getOrElseUpdateVertexFetcher(column: ServiceColumn, cacheTTLInSecs: Option[Int] = None): Option[VertexFetcher] = { val cacheKey = VertexFetcherKey + "_" + column.service.serviceName + "_" + column.columnName - cache.withCache(cacheKey, false, cacheTTLInSecs) { + cache.withCache(cacheKey, false, cacheTTLInSecs, onEvict) { column.toFetcherConfig.map { fetcherConfig => val className = fetcherConfig.getString(ClassNameKey) val fetcher = Class.forName(className) @@ -81,7 +106,7 @@ class ResourceManager(graph: S2GraphLike, cacheTTLInSecs: Option[Int] = None): Option[EdgeFetcher] = { val cacheKey = EdgeFetcherKey + "_" + label.label - cache.withCache(cacheKey, false, cacheTTLInSecs) { + cache.withCache(cacheKey, false, cacheTTLInSecs, onEvict) { label.toFetcherConfig.map { fetcherConfig => val className = fetcherConfig.getString(ClassNameKey) val fetcher = Class.forName(className) @@ -89,7 +114,10 @@ class ResourceManager(graph: S2GraphLike, .newInstance(graph) .asInstanceOf[EdgeFetcher] - fetcher.init(fetcherConfig) + fetcher.init( + fetcherConfig + .withValue("labelName", ConfigValueFactory.fromAnyRef(label.label)) + ) fetcher } @@ -99,7 +127,7 @@ class ResourceManager(graph: S2GraphLike, def getOrElseUpdateVertexMutator(column: ServiceColumn, cacheTTLInSecs: Option[Int] = None): Option[VertexMutator] = { val cacheKey = VertexMutatorKey + "_" + column.service.serviceName + "_" + column.columnName - cache.withCache(cacheKey, false, cacheTTLInSecs) { + cache.withCache(cacheKey, false, cacheTTLInSecs, onEvict) { column.toMutatorConfig.map { mutatorConfig => val className = mutatorConfig.getString(ClassNameKey) val fetcher = Class.forName(className) @@ -117,7 +145,7 @@ class ResourceManager(graph: S2GraphLike, def getOrElseUpdateEdgeMutator(label: Label, cacheTTLInSecs: Option[Int] = None): Option[EdgeMutator] = { val cacheKey = EdgeMutatorKey + "_" + label.label - cache.withCache(cacheKey, false, cacheTTLInSecs) { + cache.withCache(cacheKey, false, cacheTTLInSecs, onEvict) { label.toMutatorConfig.map { mutatorConfig => val className = mutatorConfig.getString(ClassNameKey) val fetcher = Class.forName(className) @@ -125,7 +153,10 @@ class ResourceManager(graph: S2GraphLike, .newInstance(graph) .asInstanceOf[EdgeMutator] - fetcher.init(mutatorConfig) + fetcher.init( + mutatorConfig + .withValue("labelName", ConfigValueFactory.fromAnyRef(label.label)) + ) fetcher } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index 651323f..2ff2eb3 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -63,6 +63,8 @@ object S2Graph { "db.default.user" -> "graph", "cache.max.size" -> java.lang.Integer.valueOf(0), "cache.ttl.seconds" -> java.lang.Integer.valueOf(-1), + "resource.cache.max.size" -> java.lang.Integer.valueOf(1000), + "resource.cache.ttl.seconds" -> java.lang.Integer.valueOf(-1), "hbase.client.retries.number" -> java.lang.Integer.valueOf(20), "hbase.rpcs.buffered_flush_interval" -> java.lang.Short.valueOf(100.toShort), "hbase.rpc.timeout" -> java.lang.Integer.valueOf(600000), @@ -282,12 +284,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap * right implementation(S2GRAPH-213). * */ override def getVertexFetcher(column: ServiceColumn): VertexFetcher = { - resourceManager.getOrElseUpdateVertexFetcher(column) + resourceManager.getOrElseUpdateVertexFetcher(column, Option(60 * 60 * 24)) .getOrElse(defaultStorage.vertexFetcher) } override def getEdgeFetcher(label: Label): EdgeFetcher = { - resourceManager.getOrElseUpdateEdgeFetcher(label) + resourceManager.getOrElseUpdateEdgeFetcher(label, Option(60 * 60 * 24)) .getOrElse(defaultStorage.edgeFetcher) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala index b641e7f..4addcab 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala @@ -24,7 +24,7 @@ import org.apache.s2graph.core.types.VertexId import scala.concurrent.{ExecutionContext, Future} -trait VertexFetcher { +trait VertexFetcher extends AutoCloseable { def init(config: Config)(implicit ec: ExecutionContext): Unit = {} def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala index d1c8ecf..a7c095a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala @@ -24,7 +24,7 @@ import org.apache.s2graph.core.storage.MutateResponse import scala.concurrent.{ExecutionContext, Future} -trait VertexMutator { +trait VertexMutator extends AutoCloseable { def close(): Unit = {} def init(config: Config)(implicit ec: ExecutionContext): Unit = {} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala new file mode 100644 index 0000000..927635e --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala @@ -0,0 +1,80 @@ +package org.apache.s2graph.core.storage.jdbc + +import com.typesafe.config.Config +import org.apache.s2graph.core.types.VertexId +import org.apache.s2graph.core._ +import org.apache.s2graph.core.schema.{Label, LabelMeta} +import scalikejdbc.{ConnectionPool, ConnectionPoolSettings} + +import scala.concurrent.{ExecutionContext, Future} +import scalikejdbc._ + +class JdbcEdgeFetcher(graph: S2GraphLike) extends EdgeFetcher { + + import JdbcStorage._ + + var selectQuery: SQLSyntax = _ + + override def init(config: Config)(implicit ec: ExecutionContext): Unit = { + import JdbcStorage._ + + val labelName = config.getString("labelName") + val label = Label.findByName(labelName, useCache = false).get + + selectQuery = SQLSyntax.createUnsafely( + s"SELECT ${affectedColumns(label).mkString(", ")} FROM ${labelTableName(label)}" + ) + + JdbcStorage.prepareConnection(config) + createTable(label) + } + + def toEdge(metas: Seq[LabelMeta], labelName: String, direction: String, rs: Map[String, Any]): S2EdgeLike = { + val from = rs("_from".toUpperCase()) + val to = rs("_to".toUpperCase()) + val timestamp = rs("_timestamp".toUpperCase()).asInstanceOf[java.sql.Timestamp].millis + + val props = metas.map { meta => + meta.name -> rs(meta.name.toUpperCase()) + }.toMap + + if (direction == "out") { + graph.toEdge(from, to, labelName, direction, props, timestamp) + } else { + graph.toEdge(to, from, labelName, direction, props, timestamp) + } + } + + override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = { + withReadOnlySession { implicit session => + val stepResultLs = queryRequests.map { queryRequest => + val vertex = queryRequest.vertex + val queryParam = queryRequest.queryParam + val label = queryParam.label + val metas = label.metas() + + val limit = queryParam.limit + val offset = queryParam.offset + val cond = vertex.innerId.toIdString() + val orderColumn = SQLSyntax.createUnsafely("_timestamp") + + val rows = if (queryParam.direction == "out") { + sql"""${selectQuery} WHERE _from = ${cond} ORDER BY ${orderColumn} DESC offset ${offset} limit ${limit}""".map(Row.apply).list().apply() + } else { + sql"""${selectQuery} WHERE _to = ${cond} ORDER BY ${orderColumn} DESC offset ${offset} limit ${limit}""".map(Row.apply).list().apply() + } + + val edgeWithScores = rows.map { row => + val edge = toEdge(metas, label.label, queryParam.direction, row.row) + EdgeWithScore(edge, 1.0, queryParam.label) + } + + StepResult(edgeWithScores, Nil, Nil) + } + + Future.successful(stepResultLs) + } + } + + override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = ??? +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala new file mode 100644 index 0000000..96c214b --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.jdbc + +import com.typesafe.config.Config +import org.apache.s2graph.core._ +import org.apache.s2graph.core.schema.Label +import org.apache.s2graph.core.storage.MutateResponse +import org.joda.time.DateTime +import scalikejdbc._ + +import scala.concurrent.{ExecutionContext, Future} + +class JdbcEdgeMutator(graph: S2GraphLike) extends EdgeMutator { + + import JdbcStorage._ + + override def init(config: Config)(implicit ec: ExecutionContext): Unit = { + val labelName = config.getString("labelName") + val label = Label.findByName(labelName, useCache = false).get + + JdbcStorage.prepareConnection(config) + createTable(label) + } + + override def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = { + _edges.groupBy(_.innerLabel).flatMap { case (label, edges) => + val affectedColumns = JdbcStorage.affectedColumns(label) + + val insertValues = edges.map { edge => + val values = affectedColumns.collect { + case "_timestamp" => new DateTime(edge.ts) + case "_from" => edge.srcForVertex.innerId.value + case "_to" => edge.tgtForVertex.innerId.value + case k: String => edge.propertyValue(k).map(iv => iv.innerVal.value).orNull + } + + values + } + + val columns = affectedColumns.mkString(", ") + val table = JdbcStorage.labelTableName(label) + val prepared = affectedColumns.map(_ => "?").mkString(", ") + + val conflictCheckKeys = + if (label.consistencyLevel == "strong") "(_from, _to)" + else "(_from, _to, _timestamp)" + + val sqlRaw = + s"""MERGE INTO ${table} (${columns}) KEY ${conflictCheckKeys} VALUES (${prepared})""".stripMargin + + val sql = SQLSyntax.createUnsafely(sqlRaw) + withTxSession { session => + sql"""${sql}""".batch(insertValues: _*).apply()(session) + } + + insertValues + } + + Future.successful(_edges.map(_ => true)) + } + + override def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = ??? + + override def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = ??? + + override def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long)(implicit ec: ExecutionContext): Future[MutateResponse] = ??? + + override def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, requestTs: Long, retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = ??? +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorage.scala new file mode 100644 index 0000000..cf47e41 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorage.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.jdbc + +import com.typesafe.config.Config +import org.apache.s2graph.core._ +import org.apache.s2graph.core.schema._ +import org.apache.s2graph.core.storage.{Storage, StorageManagement, StorageSerDe} +import org.apache.s2graph.core.utils.logger +import scalikejdbc._ + +object JdbcStorage { + + case class Row(row: Map[String, Any]) + + object Row { + def apply(rs: WrappedResultSet): Row = Row(rs.toMap) + } + + val PoolName = "_META_JDBC_STORAGE_" + val TablePrefix = "_EDGE_STORE" + + def toMySqlType(tpe: String): String = tpe match { + case "int" | "integer" | "i" | "int32" | "integer32" => "int(32)" + case "string" | "str" | "s" => "varchar(256)" + case "boolean" | "bool" => "tinyint(1)" + case "long" | "l" | "int64" | "integer64" => "int(64)" + case "float64" | "float" | "f" | "float32" => "float" + case "double" | "d" => "double" + case _ => "" + } + + def buildIndex(label: Label, indices: List[LabelIndex]): String = { + if (indices.isEmpty) s"KEY `${label.label}_PK` (`_timestamp`)" + "," + else { + val nameWithFields = indices.map { index => + val name = index.name + val fields = index.propNames + + name -> fields.toList.map(n => s"`${n}`") + } + + val ret = nameWithFields.map { case (name, fields) => + s"KEY `${label.label}_${name}` (${fields.mkString(", ")})" + } + + ret.mkString(",\n") + "," + } + } + + def buildProps(metas: List[LabelMeta]): String = { + if (metas.isEmpty) "" + else { + val ret = metas.map { meta => + s"""`${meta.name}` ${toMySqlType(meta.dataType)}""".trim + } + + (ret.mkString(",\n ") + ",").trim + } + } + + def showSchema(label: Label): String = { + val srcColumn = label.srcColumn + val tgtColumn = label.srcColumn + val metas = LabelMeta.findAllByLabelId(label.id.get, useCache = false).sortBy(_.name) + val consistency = label.consistencyLevel + val indices = label.indices + val isUnique = if (consistency == "strong") "UNIQUE" else "" + + s""" + |CREATE TABLE `${labelTableName(label)}`( + | `id` int(11) NOT NULL AUTO_INCREMENT, + | `_timestamp` TIMESTAMP NOT NULL default CURRENT_TIMESTAMP, + | `_from` ${toMySqlType(srcColumn.columnType)} NOT NULL, + | `_to` ${toMySqlType(tgtColumn.columnType)} NOT NULL, + | PRIMARY KEY (`id`), + | ${buildProps(metas)} + | ${buildIndex(label, indices)} + | ${isUnique} KEY `${label.label}_from` (`_from`,`_to`), + | ${isUnique} KEY `${label.label}_to` (`_to`,`_from`) + |) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + """.trim.stripMargin + } + + def labelTableName(label: Label): String = s"${TablePrefix}_${label.label}" + + def affectedColumns(label: Label): Seq[String] = { + Seq("_timestamp", "_from", "_to") ++ LabelMeta.findAllByLabelId(label.id.get, useCache = false).sortBy(_.name).map(_.name) + } + + def withTxSession[T](f: => DBSession => T): T = { + using(DB(ConnectionPool.borrow(PoolName))) { db => + db localTx { implicit session => + f(session) + } + } + } + + def withReadOnlySession[T](f: => DBSession => T): T = { + using(DB(ConnectionPool.borrow(PoolName))) { db => + f(db.readOnlySession()) + } + } + + def dropTable(label: Label): Unit = { + val cmd = s"DROP TABLE IF EXISTS ${labelTableName(label)}" + + withTxSession { session => + sql"${SQLSyntax.createUnsafely(cmd)}".execute().apply()(session) + } + } + + def tables = DB(ConnectionPool.borrow(PoolName)).getTableNames().map(_.toLowerCase()) + + def createTable(label: Label): Boolean = { + val schema = showSchema(label) + val tableName = labelTableName(label) + + if (tables.contains(tableName.toLowerCase())) { + logger.info(s"Table exists ${tableName}") + } else { + withTxSession { session => + sql"${SQLSyntax.createUnsafely(schema)}".execute().apply()(session) + } + } + + true + } + + def prepareConnection(config: Config): Unit = { + val jdbcDriverName = config.getString("driver") + Class.forName(jdbcDriverName) + + if (!ConnectionPool.isInitialized(PoolName)) { + val jdbcUrl = config.getString("url") + val user = config.getString("user") + val password = config.getString("password") + val settings = ConnectionPoolSettings(initialSize = 10, maxSize = 10, connectionTimeoutMillis = 30000L, validationQuery = "select 1;") + + ConnectionPool.add(PoolName, jdbcUrl, user, password, settings) + } + } +} + +class JdbcStorage(graph: S2GraphLike, config: Config) { + + implicit val ec = graph.ec + + JdbcStorage.prepareConnection(config) + + val h2EdgeFetcher = new JdbcEdgeFetcher(graph) + h2EdgeFetcher.init(config) + + val h2EdgeMutator = new JdbcEdgeMutator(graph) + h2EdgeMutator.init(config) + + val edgeFetcher: EdgeFetcher = h2EdgeFetcher + val edgeMutator: EdgeMutator = h2EdgeMutator +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala index 300106a..1a5cbde 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala @@ -24,8 +24,7 @@ import java.io.File import com.typesafe.config.Config import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.s2graph.core.{EdgeFetcher, S2GraphLike} -import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core.{S2GraphLike} import scala.concurrent.{ExecutionContext, Future} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala index 3ecb02f..323ee9d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean import com.google.common.cache.CacheBuilder import com.google.common.hash.Hashing -import com.typesafe.config.Config +import com.typesafe.config.{Config, ConfigFactory} import scala.collection.JavaConversions._ import scala.concurrent.{ExecutionContext, Future} @@ -85,8 +85,17 @@ class SafeUpdateCache(val config: Config) import java.lang.{Long => JLong} import SafeUpdateCache._ + + val maxSize = config.getInt(SafeUpdateCache.MaxSizeKey) val systemTtl = config.getInt(SafeUpdateCache.TtlKey) + + def this(maxSize: Int, systemTtl: Int)(implicit ec: ExecutionContext) { + this(ConfigFactory.parseMap( + Map(SafeUpdateCache.MaxSizeKey -> maxSize, SafeUpdateCache.TtlKey -> systemTtl)) + ) + } + private val cache = CacheBuilder.newBuilder().maximumSize(maxSize) .build[JLong, (AnyRef, Int, AtomicBoolean)]() @@ -114,9 +123,14 @@ class SafeUpdateCache(val config: Config) cache.invalidate(cacheKey) } + def defaultOnEvict(oldValue: AnyRef): Unit = { + logger.info(s"[SafeUpdateCache]: ${oldValue.getClass.getName} $oldValue is evicted.") + } + def withCache[T <: AnyRef](key: String, broadcast: Boolean, - cacheTTLInSecs: Option[Int] = None)(op: => T): T = { + cacheTTLInSecs: Option[Int] = None, + onEvict: AnyRef => Unit = defaultOnEvict)(op: => T): T = { val cacheKey = toCacheKey(key) val cachedValWithTs = cache.getIfPresent(cacheKey) @@ -145,6 +159,9 @@ class SafeUpdateCache(val config: Config) logger.error(s"withCache update failed: $cacheKey", ex) case Success(newValue) => put(key, newValue, broadcast = (broadcast && newValue != cachedVal)) + + onEvict(cachedVal) + logger.info(s"withCache update success: $cacheKey") } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/test/scala/org/apache/s2graph/core/fetcher/BaseFetcherTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/fetcher/BaseFetcherTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/fetcher/BaseFetcherTest.scala index 1b71a4c..79021c8 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/fetcher/BaseFetcherTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/fetcher/BaseFetcherTest.scala @@ -23,7 +23,7 @@ import com.typesafe.config.{Config, ConfigFactory} import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} import org.apache.s2graph.core.rest.RequestParser import org.apache.s2graph.core._ -import org.apache.s2graph.core.schema.{Label, Service, ServiceColumn} +import org.apache.s2graph.core.schema.{Label, LabelMeta, Service, ServiceColumn} import org.scalatest._ import scala.concurrent.{Await, ExecutionContext} @@ -67,7 +67,13 @@ trait BaseFetcherTest extends FunSuite with Matchers with BeforeAndAfterAll { val serviceColumn = management.createServiceColumn(serviceName, columnName, "string", Nil) - Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) } + Label.findByName(labelName, useCache = false).foreach { label => + label.labelMetaSet.foreach { lm => + LabelMeta.delete(lm.id.get) + } + + Label.delete(label.id.get) + } val label = management.createLabel( labelName, @@ -81,12 +87,13 @@ trait BaseFetcherTest extends FunSuite with Matchers with BeforeAndAfterAll { Seq.empty[Index], Seq(Prop(name = "score", defaultValue = "0.0", dataType = "double")), isDirected = true, - consistencyLevel = "strong", + consistencyLevel = "strong", hTableName = None, hTableTTL = None, schemaVersion = "v3", - compressionAlgorithm = "gz", - options = options + compressionAlgorithm = "gz", + options = options, + initFetcherWithOptions = true ).get management.updateEdgeFetcher(label, options) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala new file mode 100644 index 0000000..f06b2e6 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.fetcher.sql + +import org.apache.s2graph.core.Management.JsonModel._ +import org.apache.s2graph.core._ +import org.apache.s2graph.core.fetcher.BaseFetcherTest +import org.apache.s2graph.core.schema._ +import org.apache.s2graph.core.storage.jdbc._ + +import scala.concurrent._ +import scala.concurrent.duration._ + +class JdbcStorageTest extends BaseFetcherTest { + implicit lazy val ec = graph.ec + + val edgeFetcherName = classOf[JdbcEdgeFetcher].getName + val edgeMutatorName = classOf[JdbcEdgeMutator].getName + + val options = + s""" + |{ + | "fetcher": { + | "className": "${edgeFetcherName}", + | "url": "jdbc:h2:file:/tmp/s2graph/metastore;MODE=MYSQL", + | "driver": "org.h2.Driver", + | "password": "sa", + | "user": "sa" + | }, + | "mutator": { + | "className": "${edgeMutatorName}", + | "url": "jdbc:h2:file:/tmp/s2graph/metastore;MODE=MYSQL", + | "driver": "org.h2.Driver", + | "password": "sa", + | "user": "sa" + | } + |} + """.stripMargin + + val serviceName = "s2graph" + val columnName = "user" + + var service: Service = _ + var serviceColumn: ServiceColumn = _ + + override def beforeAll: Unit = { + super.beforeAll + + service = management.createService(serviceName, "localhost", "s2graph_htable", -1, None).get + serviceColumn = management.createServiceColumn(serviceName, columnName, "string", Nil) + } + + def clearLabel(labelName: String): Unit = { + Label.findByName(labelName, useCache = false).foreach { label => + label.labelMetaSet.foreach { lm => + LabelMeta.delete(lm.id.get) + } + Label.delete(label.id.get) + } + } + + def createLabel(labelName: String, + consistencyLevel: String, + props: Seq[Prop], + indices: Seq[Index]): Label = { + + clearLabel(labelName) + + val label = management.createLabel( + labelName, + service.serviceName, serviceColumn.columnName, serviceColumn.columnType, + service.serviceName, serviceColumn.columnName, serviceColumn.columnType, + service.serviceName, + indices, + props, + isDirected = true, consistencyLevel = consistencyLevel, hTableName = None, hTableTTL = None, + schemaVersion = "v3", compressionAlgorithm = "gz", options = Option(options), + initFetcherWithOptions = true + ).get + + management.updateEdgeFetcher(label, Option(options)) + label + } + + def normalise(s: String): String = s.split("\n").map(_.trim).filterNot(_.isEmpty).mkString("\n") + + test("CreateLabel - label A: {strong, defaultIndex}") { + val props = Seq( + Prop(name = "score", defaultValue = "0.0", dataType = "double"), + Prop(name = "age", defaultValue = "0", dataType = "int") + ) + val indices = Nil + + val label = createLabel("A", "strong", props, indices) + + val expectedColumn = Seq("_timestamp", "_from", "_to", "age", "score") + val actualColumn = JdbcStorage.affectedColumns(label) + + actualColumn shouldBe expectedColumn + + val expectedSchema = + """ + CREATE TABLE `_EDGE_STORE_A`( + `id` int(11) NOT NULL AUTO_INCREMENT, + `_timestamp` TIMESTAMP NOT NULL default CURRENT_TIMESTAMP, + `_from` varchar(256) NOT NULL, + `_to` varchar(256) NOT NULL, + PRIMARY KEY (`id`), + `age` int(32), + `score` double, + KEY `A__PK` (`_timestamp`), + UNIQUE KEY `A_from` (`_from`,`_to`), + UNIQUE KEY `A_to` (`_to`,`_from`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + """ + + val actualSchema = JdbcStorage.showSchema(label) + println(actualSchema) + + normalise(actualSchema) shouldBe normalise(expectedSchema) + } + + test("CreateLabel - label B: {strong, (age, score) index}") { + val props = Seq( + Prop(name = "score", defaultValue = "0.0", dataType = "double"), + Prop(name = "age", defaultValue = "0", dataType = "int") + ) + val indices = Seq( + Index("idx_age_score", Seq("age", "score")) + ) + + val label = createLabel("B", "strong", props, indices) + + val expectedColumn = Seq("_timestamp", "_from", "_to", "age", "score") + val actualColumn = JdbcStorage.affectedColumns(label) + + actualColumn shouldBe expectedColumn + + val expectedSchema = + """ + CREATE TABLE `_EDGE_STORE_B`( + `id` int(11) NOT NULL AUTO_INCREMENT, + `_timestamp` TIMESTAMP NOT NULL default CURRENT_TIMESTAMP, + `_from` varchar(256) NOT NULL, + `_to` varchar(256) NOT NULL, + PRIMARY KEY (`id`), + `age` int(32), + `score` double, + KEY `B_idx_age_score` (`age`, `score`), + UNIQUE KEY `B_from` (`_from`,`_to`), + UNIQUE KEY `B_to` (`_to`,`_from`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + """ + + val actualSchema = JdbcStorage.showSchema(label) + normalise(actualSchema) shouldBe normalise(expectedSchema) + } + + test("CreateLabel - label C: {weak, defaultIndex}") { + val props = Seq( + Prop(name = "score", defaultValue = "0.0", dataType = "double"), + Prop(name = "age", defaultValue = "0", dataType = "int") + ) + val indices = Nil + + val label = createLabel("C", "weak", props, indices) + + val expectedColumn = Seq("_timestamp", "_from", "_to", "age", "score") + val actualColumn = JdbcStorage.affectedColumns(label) + + actualColumn shouldBe expectedColumn + + val expectedSchema = + """ + CREATE TABLE `_EDGE_STORE_C`( + `id` int(11) NOT NULL AUTO_INCREMENT, + `_timestamp` TIMESTAMP NOT NULL default CURRENT_TIMESTAMP, + `_from` varchar(256) NOT NULL, + `_to` varchar(256) NOT NULL, + PRIMARY KEY (`id`), + `age` int(32), + `score` double, + KEY `C__PK` (`_timestamp`), + KEY `C_from` (`_from`,`_to`), + KEY `C_to` (`_to`,`_from`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + """ + + val actualSchema = JdbcStorage.showSchema(label) + normalise(actualSchema) shouldBe normalise(expectedSchema) + } + + test("Mutate and fetch edges - label A") { + val label = Label.findByName("A", useCache = false).get + + JdbcStorage.dropTable(label) + JdbcStorage.createTable(label) + + val fetcher = graph.getEdgeFetcher(label) + val mutator = graph.getEdgeMutator(label) + + val edgeElric = + graph.toEdge("daewon", "elric", label.label, "out", Map("score" -> 90), ts = 10) + + val edgeRain = + graph.toEdge("daewon", "rain", label.label, "out", Map("score" -> 50), ts = 5) + + val edgeShon = + graph.toEdge("daewon", "shon", label.label, "out", Map("score" -> 100), ts = 15) + + val edgeElricUpdated = + graph.toEdge("daewon", "elric", label.label, "out", Map("score" -> 200), ts = 20) + + val insertEdges = Seq(edgeShon, edgeElric, edgeRain) + val updateEdges = Seq(edgeElricUpdated) + + Await.ready(mutator.mutateStrongEdges("", insertEdges, true), Duration("10 sec")) + Await.ready(mutator.mutateStrongEdges("", updateEdges, true), Duration("10 sec")) + + val qp = QueryParam(labelName = label.label, offset = 0, limit = 10) + val vertex = edgeElric.srcVertex + val step = Step(Seq(qp)) + val q = Query(Seq(vertex), steps = Vector(step)) + val qr = QueryRequest(q, 0, vertex, qp) + + val fetchedEdges = Await.result( + fetcher.fetches(Seq(qr), Map.empty), Duration("10 sec")).flatMap(_.edgeWithScores.map(_.edge) + ) + + fetchedEdges shouldBe Seq(edgeElricUpdated, edgeShon, edgeRain) // order by timestamp desc + } + + test("Mutate and fetch edges - label C") { + val label = Label.findByName("C", useCache = false).get + + JdbcStorage.dropTable(label) + JdbcStorage.createTable(label) + + val fetcher = graph.getEdgeFetcher(label) + val mutator = graph.getEdgeMutator(label) + + val edgeShon = + graph.toEdge("daewon", "shon", label.label, "out", Map("score" -> 90), ts = 10) + + val edgeShon2 = + graph.toEdge("daewon", "shon", label.label, "out", Map("score" -> 50), ts = 5) + + val insertEdges = Seq(edgeShon, edgeShon2) + + Await.ready(mutator.mutateStrongEdges("", insertEdges, true), Duration("10 sec")) + + val qp = QueryParam(labelName = label.label, offset = 0, limit = 10, duplicatePolicy = DuplicatePolicy.Raw) + val vertex = edgeShon.srcVertex + val step = Step(Seq(qp)) + val q = Query(Seq(vertex), steps = Vector(step)) + val qr = QueryRequest(q, 0, vertex, qp) + + val fetchedEdges = Await.result( + fetcher.fetches(Seq(qr), Map.empty), Duration("10 sec")).flatMap(_.edgeWithScores.map(_.edge) + ) + + fetchedEdges shouldBe Seq(edgeShon, edgeShon2) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala b/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala index 72412f5..ef01d52 100644 --- a/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala +++ b/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala @@ -77,7 +77,7 @@ trait TestGraph { } class EmptyGraph(config: Config) extends TestGraph { - Schema.apply(config) + org.apache.s2graph.core.schema.Schema.apply(config) lazy val graph = new S2Graph(config)(scala.concurrent.ExecutionContext.Implicits.global) lazy val management = new Management(graph) @@ -92,7 +92,7 @@ class EmptyGraph(config: Config) extends TestGraph { override def repository: GraphRepository = s2Repository override def open(): Unit = { - Schema.shutdown(true) + org.apache.s2graph.core.schema.Schema.shutdown(true) } } @@ -131,9 +131,10 @@ class BasicGraph(config: Config) extends EmptyGraph(config) { labelName, serviceName, columnName, "string", serviceName, columnName, "string", - true, serviceName, + serviceName, Nil, Seq(Prop("score", "0", "int")), + true, "strong" ) }
