Repository: incubator-s2graph Updated Branches: refs/heads/master c1698e31e -> 07a5af39a
add background task on ResourceManager onEvict. Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/447eca4c Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/447eca4c Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/447eca4c Branch: refs/heads/master Commit: 447eca4c24e3915deaa86c8a2e2222976308ea92 Parents: 08d6a3e Author: DO YUNG YOON <[email protected]> Authored: Mon Jul 9 16:27:22 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Mon Jul 9 16:27:22 2018 +0900 ---------------------------------------------------------------------- .../apache/s2graph/core/ResourceManager.scala | 28 +++++++++++++++----- .../scala/org/apache/s2graph/core/S2Graph.scala | 8 +++--- .../s2graph/core/utils/SafeUpdateCache.scala | 2 +- 3 files changed, 28 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/447eca4c/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala index 051ca9f..14ff767 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala @@ -19,6 +19,8 @@ package org.apache.s2graph.core +import java.util.concurrent.{Executors, TimeUnit} + import com.typesafe.config.impl.ConfigImpl import com.typesafe.config._ import org.apache.s2graph.core.schema.{Label, ServiceColumn} @@ -47,7 +49,7 @@ object ResourceManager { val EdgeMutatorKey = classOf[EdgeMutator].getName val VertexMutatorKey = classOf[VertexMutator].getName - val DefaultMaxSize = 1000 + val DefaultMaxSize = 10 val DefaultCacheTTL = -1 val DefaultConfig = ConfigFactory.parseMap(Map(MaxSizeKey -> DefaultMaxSize, TtlKey -> DefaultCacheTTL).asJava) } @@ -59,6 +61,13 @@ class ResourceManager(graph: S2GraphLike, import scala.collection.JavaConverters._ + def shutdown(): Unit = { + cache.asMap().asScala.foreach { case (_, (obj, _, _)) => + onEvict(obj) + } + } + val scheduler = Executors.newScheduledThreadPool(1) + val waitForEvictionInSeconds = 10 val maxSize = Try(_config.getInt(ResourceManager.MaxSizeKey)).getOrElse(DefaultMaxSize) val cacheTTL = Try(_config.getInt(ResourceManager.CacheTTL)).getOrElse(DefaultCacheTTL) @@ -72,21 +81,28 @@ class ResourceManager(graph: S2GraphLike, cache.asMap().asScala.toSeq.collect { case (_, (obj: EdgeFetcher, _, _)) => obj } } + def onEvict(oldValue: AnyRef): Unit = { oldValue match { case o: Option[_] => o.foreach { case v: AutoCloseable => - v.close() - logger.info(s"[${oldValue.getClass.getName}]: $oldValue evicted.") + scheduler.schedule(newCloseTask(v), waitForEvictionInSeconds, TimeUnit.SECONDS) } case v: AutoCloseable => - v.close() - logger.info(s"[${oldValue.getClass.getName}]: $oldValue evicted.") - + scheduler.schedule(newCloseTask(v), waitForEvictionInSeconds, TimeUnit.SECONDS) case _ => logger.info(s"Class does't have close() method ${oldValue.getClass.getName}") } } + private def newCloseTask(v: AutoCloseable) = { + new Runnable { + override def run(): Unit = { + v.close() + logger.info(s"[${v.getClass.getName}]: $v evicted.") + } + } + } + def getOrElseUpdateVertexFetcher(column: ServiceColumn, cacheTTLInSecs: Option[Int] = None): Option[VertexFetcher] = { val cacheKey = VertexFetcherKey + "_" + column.service.serviceName + "_" + column.columnName http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/447eca4c/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index 7d6e20b..9657e10 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -295,6 +295,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap if (running.compareAndSet(true, false)) { flushStorage() Schema.shutdown(modelDataDelete) + resourceManager.shutdown() defaultStorage.shutdown() localLongId.set(0l) } @@ -353,9 +354,10 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap mutateVertices(getStorage(service))(service.cluster, vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2))) } - indexProvider.mutateVerticesAsync(vertices) - Future.sequence(futures).map{ ls => - ls.flatten.toSeq.sortBy(_._2).map(_._1) + Future.sequence(futures).flatMap { ls => + indexProvider.mutateVerticesAsync(vertices).map { _ => + ls.flatten.toSeq.sortBy(_._2).map(_._1) + } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/447eca4c/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala index 755b8d0..bf1f22c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala @@ -161,8 +161,8 @@ class SafeUpdateCache(val config: Config) put(key, cachedVal, false) logger.error(s"withCache update failed: $cacheKey", ex) case Success(newValue) => - put(key, newValue, broadcast = (broadcast && newValue != cachedVal)) + put(key, newValue, broadcast = (broadcast && newValue != cachedVal)) onEvict(cachedVal) cachedVal match {
