implement jdbc driver for H2 DB
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/2b1c88a8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/2b1c88a8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/2b1c88a8 Branch: refs/heads/master Commit: 2b1c88a81d1b9524d6746a705f32db4c4c407d55 Parents: 3ddea3f Author: daewon <[email protected]> Authored: Mon May 28 12:59:42 2018 +0900 Committer: daewon <[email protected]> Committed: Mon May 28 13:01:20 2018 +0900 ---------------------------------------------------------------------- .../apache/s2graph/core/ResourceManager.scala | 13 +-- .../core/storage/jdbc/JdbcEdgeFetcher.scala | 19 +++++ .../core/storage/jdbc/JdbcEdgeMutator.scala | 90 ++++++++++++++------ .../s2graph/core/utils/SafeUpdateCache.scala | 10 ++- .../core/storage/jdbc/JdbcStorageTest.scala | 22 ++++- 5 files changed, 119 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b1c88a8/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala index 423b3c9..051ca9f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala @@ -74,14 +74,17 @@ class ResourceManager(graph: S2GraphLike, def onEvict(oldValue: AnyRef): Unit = { oldValue match { - case o: Option[_] => o.foreach { - case v: AutoCloseable => v.close() + case o: Option[_] => o.foreach { case v: AutoCloseable => + v.close() + logger.info(s"[${oldValue.getClass.getName}]: $oldValue evicted.") } - case v: AutoCloseable => v.close() + + case v: AutoCloseable => + v.close() + logger.info(s"[${oldValue.getClass.getName}]: $oldValue evicted.") + case _ => logger.info(s"Class does't have close() method ${oldValue.getClass.getName}") } - - logger.info(s"[${oldValue.getClass.getName}]: $oldValue evicted.") } def getOrElseUpdateVertexFetcher(column: ServiceColumn, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b1c88a8/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala index 927635e..ea3df74 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.s2graph.core.storage.jdbc import com.typesafe.config.Config http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b1c88a8/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala index 96c214b..1650a2d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala @@ -23,6 +23,7 @@ import com.typesafe.config.Config import org.apache.s2graph.core._ import org.apache.s2graph.core.schema.Label import org.apache.s2graph.core.storage.MutateResponse +import org.apache.s2graph.core.utils.logger import org.joda.time.DateTime import scalikejdbc._ @@ -40,44 +41,79 @@ class JdbcEdgeMutator(graph: S2GraphLike) extends EdgeMutator { createTable(label) } - override def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = { - _edges.groupBy(_.innerLabel).flatMap { case (label, edges) => - val affectedColumns = JdbcStorage.affectedColumns(label) - - val insertValues = edges.map { edge => - val values = affectedColumns.collect { - case "_timestamp" => new DateTime(edge.ts) - case "_from" => edge.srcForVertex.innerId.value - case "_to" => edge.tgtForVertex.innerId.value - case k: String => edge.propertyValue(k).map(iv => iv.innerVal.value).orNull - } - - values - } + def extractValues(affectedColumns: Seq[String], edge: S2EdgeLike): Seq[Any] = { + val values = affectedColumns.collect { + case "_timestamp" => new DateTime(edge.ts) + case "_from" => edge.srcForVertex.innerId.value + case "_to" => edge.tgtForVertex.innerId.value + case k: String => edge.propertyValue(k).map(iv => iv.innerVal.value).orNull + } + + values + } + + def keyColumns(label: Label): Seq[String] = + if (label.consistencyLevel == "strong") Seq("_from", "_to") + else Seq("_from", "_to", "_timestamp") + + def insertOp(label: Label, edges: Seq[S2EdgeLike]): Seq[Boolean] = { + val table = JdbcStorage.labelTableName(label) + val affectedColumns = JdbcStorage.affectedColumns(label) + val insertValues = edges.map { edge => extractValues(affectedColumns, edge) } + + val columns = affectedColumns.mkString(", ") + val prepared = affectedColumns.map(_ => "?").mkString(", ") + + val conflictCheckKeys = keyColumns(label).mkString("(", ",", ")") + + val sqlRaw = s"""MERGE INTO ${table} (${columns}) KEY ${conflictCheckKeys} VALUES (${prepared});""" + logger.debug(sqlRaw) + + val sql = SQLSyntax.createUnsafely(sqlRaw) + val ret = withTxSession { session => + sql"""${sql}""".batch(insertValues: _*).apply()(session) + } + + ret.map(_ => true) + } - val columns = affectedColumns.mkString(", ") - val table = JdbcStorage.labelTableName(label) - val prepared = affectedColumns.map(_ => "?").mkString(", ") + def deleteOp(label: Label, edges: Seq[S2EdgeLike]): Seq[Boolean] = { + val table = JdbcStorage.labelTableName(label) - val conflictCheckKeys = - if (label.consistencyLevel == "strong") "(_from, _to)" - else "(_from, _to, _timestamp)" + val keyColumnLs = keyColumns(label) + val deleteValues = edges.map { edge => extractValues(keyColumnLs, edge) } + val prepared = keyColumnLs.map(k => s"${k} = ?").mkString(" AND ") - val sqlRaw = - s"""MERGE INTO ${table} (${columns}) KEY ${conflictCheckKeys} VALUES (${prepared})""".stripMargin + val sqlRaw = s"""DELETE FROM ${table} WHERE ($prepared);""" + logger.debug(sqlRaw) + val ret = withTxSession { session => val sql = SQLSyntax.createUnsafely(sqlRaw) - withTxSession { session => - sql"""${sql}""".batch(insertValues: _*).apply()(session) + deleteValues.map { deleteValue => + sql"""${sql}""".batch(Seq(deleteValue): _*).apply()(session) } + } + + ret.map(_ => true) + } - insertValues + override def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = { + val ret = _edges.groupBy(_.innerLabel).flatMap { case (label, edges) => + val (edgesToDelete, edgesToInsert) = edges.partition(_.getOperation() == "delete") + + insertOp(label, edgesToInsert) ++ deleteOp(label, edgesToDelete) } - Future.successful(_edges.map(_ => true)) + Future.successful(ret.toSeq) } - override def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = ??? + override def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = { + val ret = mutateStrongEdges(zkQuorum, _edges, withWait).map { ret => + ret.zipWithIndex.map { case (r, i) => i -> r } + } + + ret + } override def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = ??? http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b1c88a8/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala index 323ee9d..755b8d0 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala @@ -124,7 +124,10 @@ class SafeUpdateCache(val config: Config) } def defaultOnEvict(oldValue: AnyRef): Unit = { - logger.info(s"[SafeUpdateCache]: ${oldValue.getClass.getName} $oldValue is evicted.") + oldValue match { + case None => + case _ => logger.info(s"[SafeUpdateCache]: ${oldValue.getClass.getName} $oldValue is evicted.") + } } def withCache[T <: AnyRef](key: String, @@ -162,7 +165,10 @@ class SafeUpdateCache(val config: Config) onEvict(cachedVal) - logger.info(s"withCache update success: $cacheKey") + cachedVal match { + case None => + case _ => logger.info(s"withCache update success: $cacheKey") + } } cachedVal http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b1c88a8/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala index f06b2e6..424e5ba 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala @@ -245,9 +245,19 @@ class JdbcStorageTest extends BaseFetcherTest { ) fetchedEdges shouldBe Seq(edgeElricUpdated, edgeShon, edgeRain) // order by timestamp desc + + // delete + val deleteEdge = Seq(edgeElric.copyOp(GraphUtil.operations("delete"))) + Await.ready(mutator.mutateStrongEdges("", deleteEdge, true), Duration("10 sec")) + + val fetchedEdgesAfterDeleted = Await.result( + fetcher.fetches(Seq(qr), Map.empty), Duration("10 sec")).flatMap(_.edgeWithScores.map(_.edge) + ) + + fetchedEdgesAfterDeleted shouldBe Seq(edgeShon, edgeRain) // elric was deleted } - test("Mutate and fetch edges - label C") { + test("Mutate and fetch edges - label C(weak)") { val label = Label.findByName("C", useCache = false).get JdbcStorage.dropTable(label) @@ -277,5 +287,15 @@ class JdbcStorageTest extends BaseFetcherTest { ) fetchedEdges shouldBe Seq(edgeShon, edgeShon2) + + // delete + val deleteEdge = Seq(edgeShon.copyOp(GraphUtil.operations("delete"))) + Await.ready(mutator.mutateStrongEdges("", deleteEdge, true), Duration("10 sec")) + + val fetchedEdgesAfterDeleted = Await.result( + fetcher.fetches(Seq(qr), Map.empty), Duration("10 sec")).flatMap(_.edgeWithScores.map(_.edge) + ) + + fetchedEdgesAfterDeleted shouldBe Seq(edgeShon2) // ts is diff } }
