add cacheTTLInSecs on SafeUpdateCache.withCache.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e8230417 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e8230417 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e8230417 Branch: refs/heads/master Commit: e823041715dcca2d330bb25410a0a7cf420bc522 Parents: 9132f74 Author: DO YUNG YOON <[email protected]> Authored: Thu May 10 10:35:35 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Thu May 10 10:35:35 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/core/Management.scala | 8 ++++---- .../apache/s2graph/core/ResourceManager.scala | 20 ++++++++++++-------- .../s2graph/core/utils/SafeUpdateCache.scala | 9 ++++++--- 3 files changed, 22 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8230417/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index c3aef7a..f534423 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -313,7 +313,7 @@ class Management(graph: S2GraphLike) { def updateEdgeFetcher(label: Label, options: String): Unit = { val newLabel = Label.updateOption(label, options) - graph.resourceManager.getOrElseUpdateEdgeFetcher(newLabel, forceUpdate = true) + graph.resourceManager.getOrElseUpdateEdgeFetcher(newLabel, cacheTTLInSecs = Option(-1)) } def updateVertexFetcher(serviceName: String, columnName: String, options: String): Unit = { @@ -325,7 +325,7 @@ class Management(graph: S2GraphLike) { def updateVertexFetcher(column: ServiceColumn, options: String): Unit = { val newColumn = ServiceColumn.updateOption(column, options) - graph.resourceManager.getOrElseUpdateVertexFetcher(newColumn, forceUpdate = true) + graph.resourceManager.getOrElseUpdateVertexFetcher(newColumn, cacheTTLInSecs = Option(-1)) } def updateEdgeMutator(labelName: String, options: String): Unit = { @@ -336,7 +336,7 @@ class Management(graph: S2GraphLike) { def updateEdgeMutator(label: Label, options: String): Unit = { val newLabel = Label.updateOption(label, options) - graph.resourceManager.getOrElseUpdateEdgeMutator(newLabel, forceUpdate = true) + graph.resourceManager.getOrElseUpdateEdgeMutator(newLabel, cacheTTLInSecs = Option(-1)) } def updateVertexMutator(serviceName: String, columnName: String, options: String): Unit = { @@ -348,7 +348,7 @@ class Management(graph: S2GraphLike) { def updateVertexMutator(column: ServiceColumn, options: String): Unit = { val newColumn = ServiceColumn.updateOption(column, options) - graph.resourceManager.getOrElseUpdateVertexMutator(newColumn, forceUpdate = true) + graph.resourceManager.getOrElseUpdateVertexMutator(newColumn, cacheTTLInSecs = Option(-1)) } def createStorageTable(zkAddr: String, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8230417/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala index b877603..a8a3a34 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala @@ -59,9 +59,10 @@ class ResourceManager(graph: S2GraphLike, cache.asMap().asScala.toSeq.collect { case (_, (obj: EdgeFetcher, _, _)) => obj } } - def getOrElseUpdateVertexFetcher(column: ServiceColumn, forceUpdate: Boolean = false): Option[VertexFetcher] = { + def getOrElseUpdateVertexFetcher(column: ServiceColumn, + cacheTTLInSecs: Option[Int] = None): Option[VertexFetcher] = { val cacheKey = VertexFetcherKey + "_" + column.service.serviceName + "_" + column.columnName - cache.withCache(cacheKey, false, forceUpdate) { + cache.withCache(cacheKey, false, cacheTTLInSecs) { column.toFetcherConfig.map { fetcherConfig => val className = fetcherConfig.getString(ClassNameKey) val fetcher = Class.forName(className) @@ -76,10 +77,11 @@ class ResourceManager(graph: S2GraphLike, } } - def getOrElseUpdateEdgeFetcher(label: Label, forceUpdate: Boolean = false): Option[EdgeFetcher] = { + def getOrElseUpdateEdgeFetcher(label: Label, + cacheTTLInSecs: Option[Int] = None): Option[EdgeFetcher] = { val cacheKey = EdgeFetcherKey + "_" + label.label - cache.withCache(cacheKey, false, forceUpdate) { + cache.withCache(cacheKey, false, cacheTTLInSecs) { label.toFetcherConfig.map { fetcherConfig => val className = fetcherConfig.getString(ClassNameKey) val fetcher = Class.forName(className) @@ -94,9 +96,10 @@ class ResourceManager(graph: S2GraphLike, } } - def getOrElseUpdateVertexMutator(column: ServiceColumn, forceUpdate: Boolean = false): Option[VertexMutator] = { + def getOrElseUpdateVertexMutator(column: ServiceColumn, + cacheTTLInSecs: Option[Int] = None): Option[VertexMutator] = { val cacheKey = VertexMutatorKey + "_" + column.service.serviceName + "_" + column.columnName - cache.withCache(cacheKey, false, forceUpdate) { + cache.withCache(cacheKey, false, cacheTTLInSecs) { column.toMutatorConfig.map { mutatorConfig => val className = mutatorConfig.getString(ClassNameKey) val fetcher = Class.forName(className) @@ -111,9 +114,10 @@ class ResourceManager(graph: S2GraphLike, } } - def getOrElseUpdateEdgeMutator(label: Label, forceUpdate: Boolean = false): Option[EdgeMutator] = { + def getOrElseUpdateEdgeMutator(label: Label, + cacheTTLInSecs: Option[Int] = None): Option[EdgeMutator] = { val cacheKey = EdgeMutatorKey + "_" + label.label - cache.withCache(cacheKey, false, forceUpdate) { + cache.withCache(cacheKey, false, cacheTTLInSecs) { label.toMutatorConfig.map { mutatorConfig => val className = mutatorConfig.getString(ClassNameKey) val fetcher = Class.forName(className) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8230417/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala index 51e45c2..3ecb02f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala @@ -86,7 +86,7 @@ class SafeUpdateCache(val config: Config) import java.lang.{Long => JLong} import SafeUpdateCache._ val maxSize = config.getInt(SafeUpdateCache.MaxSizeKey) - val ttl = config.getInt(SafeUpdateCache.TtlKey) + val systemTtl = config.getInt(SafeUpdateCache.TtlKey) private val cache = CacheBuilder.newBuilder().maximumSize(maxSize) .build[JLong, (AnyRef, Int, AtomicBoolean)]() @@ -116,7 +116,7 @@ class SafeUpdateCache(val config: Config) def withCache[T <: AnyRef](key: String, broadcast: Boolean, - forceUpdate: Boolean = false)(op: => T): T = { + cacheTTLInSecs: Option[Int] = None)(op: => T): T = { val cacheKey = toCacheKey(key) val cachedValWithTs = cache.getIfPresent(cacheKey) @@ -129,7 +129,10 @@ class SafeUpdateCache(val config: Config) val (_cachedVal, updatedAt, isUpdating) = cachedValWithTs val cachedVal = _cachedVal.asInstanceOf[T] - if (!forceUpdate && toTs() < updatedAt + ttl) cachedVal // in cache TTL + val ttl = cacheTTLInSecs.getOrElse(systemTtl) + val isValidCacheVal = toTs() < updatedAt + ttl + + if (isValidCacheVal) cachedVal // in cache TTL else { val running = isUpdating.getAndSet(true)
