Repository: incubator-s2graph Updated Branches: refs/heads/master 69273249a -> 09a7919de
[S2GRAPH-82]: Merge DeferCache and FutureCache. JIRA: [S2GRAPH-82] https://issues.apache.org/jira/browse/S2GRAPH-82 Pull Request: Closes #54 Authors: Daewon Jeong: <blue...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/09a7919d Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/09a7919d Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/09a7919d Branch: refs/heads/master Commit: 09a7919de88571e2b2a6924ef3d28fcc69b8d593 Parents: 6927324 Author: DO YUNG YOON <steams...@apache.org> Authored: Thu Sep 1 06:35:13 2016 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Thu Sep 1 06:35:13 2016 +0900 ---------------------------------------------------------------------- CHANGES | 2 + s2core/src/main/resources/reference.conf | 1 + .../scala/org/apache/s2graph/core/Graph.scala | 1 + .../core/storage/hbase/AsynchbaseStorage.scala | 33 ++-- .../apache/s2graph/core/utils/DeferCache.scala | 162 +++++++++++++++---- .../apache/s2graph/core/utils/FutureCache.scala | 101 ------------ .../org/apache/s2graph/core/utils/Logger.scala | 3 + 7 files changed, 154 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/09a7919d/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 28cfbcd..ebb11c6 100644 --- a/CHANGES +++ b/CHANGES @@ -74,6 +74,8 @@ Release 0.12.1 - unreleased S2GRAPH-70: Automate the process of building a distribution package (Contributed by Jong Wook Kim<jongw...@nyu.edu>, committed by DOYUNG YOON) + S2GRAPH-82: Merge DeferCache and FutureCache (Committed by Daewon Jeong). + BUG FIXES S2GRAPH-18: Query Option "interval" is Broken. http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/09a7919d/s2core/src/main/resources/reference.conf ---------------------------------------------------------------------- diff --git a/s2core/src/main/resources/reference.conf b/s2core/src/main/resources/reference.conf index 86dfa67..e5c129b 100644 --- a/s2core/src/main/resources/reference.conf +++ b/s2core/src/main/resources/reference.conf @@ -39,6 +39,7 @@ max.back.off=50 future.cache.max.size=100000 future.cache.expire.after.write=10000 future.cache.expire.after.access=5000 +future.cache.metric.interval=60000 # Local Cache cache.ttl.seconds=60 http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/09a7919d/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala index aee0e95..e5aa6eb 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala @@ -61,6 +61,7 @@ object Graph { "future.cache.max.size" -> java.lang.Integer.valueOf(100000), "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000), "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000), + "future.cache.metric.interval" -> java.lang.Integer.valueOf(60000), "s2graph.storage.backend" -> "hbase", "query.hardlimit" -> java.lang.Integer.valueOf(100000) ) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/09a7919d/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index c0c369b..19d5cc8 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -36,13 +36,13 @@ import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.LabelMeta import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.types.{HBaseType, VertexId} -import org.apache.s2graph.core.utils.{DeferCache, Extensions, FutureCache, logger} +import org.apache.s2graph.core.utils._ import org.hbase.async._ import scala.collection.JavaConversions._ import scala.collection.{Map, Seq} import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContext, Future, duration} +import scala.concurrent._ import scala.util.hashing.MurmurHash3 @@ -98,11 +98,13 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte private val emptyKeyValues = new util.ArrayList[KeyValue]() private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client + import CanDefer._ + /** Future Cache to squash request */ - private val futureCache = new DeferCache[QueryRequestWithResult](config)(ec) + private val futureCache = new DeferCache[QueryResult, Deferred, Deferred](config, QueryResult(), "FutureCache", useMetric = true) /** Simple Vertex Cache */ - private val vertexCache = new FutureCache[Seq[SKeyValue]](config)(ec) + private val vertexCache = new DeferCache[Seq[SKeyValue], Promise, Future](config, Seq.empty[SKeyValue]) /** @@ -277,19 +279,19 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte isInnerCall: Boolean, parentEdges: Seq[EdgeWithScore]): Deferred[QueryRequestWithResult] = { - def fetchInner(hbaseRpc: AnyRef): Deferred[QueryRequestWithResult] = { + def fetchInner(hbaseRpc: AnyRef): Deferred[QueryResult] = { fetchKeyValuesInner(hbaseRpc).withCallback { kvs => val edgeWithScores = toEdges(kvs, queryRequest.queryParam, prevStepScore, isInnerCall, parentEdges) val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) { sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample) } else edgeWithScores -// QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte])) - QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty))) + QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte])) +// QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty))) } recoverWith { ex => logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex) -// QueryResult(isFailure = true) - QueryRequestWithResult(queryRequest, QueryResult(isFailure = true)) + QueryResult(isFailure = true) +// QueryRequestWithResult(queryRequest, QueryResult(isFailure = true)) } } @@ -297,13 +299,14 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte val cacheTTL = queryParam.cacheTTLInMillis val request = buildRequest(queryRequest) - - if (cacheTTL <= 0) fetchInner(request) - else { - val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request)) - val cacheKey = queryParam.toCacheKey(cacheKeyBytes) - futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request)) + val defer = + if (cacheTTL <= 0) fetchInner(request) + else { + val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request)) + val cacheKey = queryParam.toCacheKey(cacheKeyBytes) + futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request)) } + defer withCallback { queryResult => QueryRequestWithResult(queryRequest, queryResult)} } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/09a7919d/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala index 4198010..96f87ed 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,83 +19,179 @@ package org.apache.s2graph.core.utils -import java.util.concurrent.TimeUnit +import java.util.concurrent.{Executors, TimeUnit} import com.google.common.cache.CacheBuilder -import com.stumbleupon.async.Deferred +import com.stumbleupon.async.{Callback, Deferred} import com.typesafe.config.Config -import scala.concurrent.ExecutionContext +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.language.higherKinds -class DeferCache[R](config: Config)(implicit ex: ExecutionContext) { +trait CanDefer[A, M[_], C[_]] { + def promise: M[A] - import Extensions.DeferOps + def future(defer: M[A]): C[A] - type Value = (Long, Deferred[R]) + def success(defer: M[A], value: A): Unit + + def failure(defer: M[A], cause: Throwable): Unit + + def onSuccess(defer: C[A])(pf: PartialFunction[A, A])(implicit ec: ExecutionContext) + + def onFailure(defer: C[A])(pf: PartialFunction[Throwable, A])(implicit ec: ExecutionContext) +} + +object CanDefer { + implicit def implFuture[A] = new CanDefer[A, Promise, Future] { + override def promise: Promise[A] = Promise[A]() + + override def future(defer: Promise[A]): Future[A] = defer.future + + override def success(defer: Promise[A], value: A) = defer.success(value) + + override def failure(defer: Promise[A], cause: Throwable) = defer.failure(cause) + + override def onSuccess(defer: Future[A])(pf: PartialFunction[A, A])(implicit ec: ExecutionContext) = defer onSuccess pf + + override def onFailure(defer: Future[A])(pf: PartialFunction[Throwable, A])(implicit ec: ExecutionContext) = defer onFailure pf + } + + implicit def implDeferred[A] = new CanDefer[A, Deferred, Deferred] { + + override def promise: Deferred[A] = new Deferred[A]() + + override def future(defer: Deferred[A]): Deferred[A] = defer + + override def success(defer: Deferred[A], value: A) = defer.callback(value) + + override def failure(defer: Deferred[A], cause: Throwable) = defer.callback(cause) + + override def onSuccess(defer: Deferred[A])(pf: PartialFunction[A, A])(implicit _ec: ExecutionContext) = + defer.addCallback(new Callback[A, A] { + override def call(arg: A): A = pf(arg) + }) + + override def onFailure(defer: Deferred[A])(pf: PartialFunction[Throwable, A])(implicit ec: ExecutionContext) = + defer.addErrback(new Callback[A, Exception] { + override def call(t: Exception): A = pf(t) + }) + } +} + +object DeferCache { + private val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor() + + def addScheduleJob(delay: Long)(block: => Unit) = + scheduledThreadPool.scheduleWithFixedDelay(new Runnable { + override def run(): Unit = block + }, 1000, delay, TimeUnit.MILLISECONDS) +} + +/** + * @param config + * @param ec + * @param canDefer: implicit evidence to find out implementation of CanDefer. + * @tparam A: actual element type that will be stored in M[_] and C[_]. + * @tparam M[_]: container type that will be stored in local cache. ex) Promise, Defer. + * @tparam C[_]: container type that will be returned to client of this class. Ex) Future, Defer. + */ +class DeferCache[A, M[_], C[_]](config: Config, empty: => A, name: String = "", useMetric: Boolean = false)(implicit ec: ExecutionContext, canDefer: CanDefer[A, M, C]) { + type Value = (Long, C[A]) private val maxSize = config.getInt("future.cache.max.size") + private val metricInterval = config.getInt("future.cache.metric.interval") private val expireAfterWrite = config.getInt("future.cache.expire.after.write") private val expireAfterAccess = config.getInt("future.cache.expire.after.access") - private val futureCache = CacheBuilder.newBuilder() - .initialCapacity(maxSize) - .concurrencyLevel(Runtime.getRuntime.availableProcessors()) - .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS) - .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS) - .maximumSize(maxSize).build[java.lang.Long, (Long, Deferred[R])]() - + private val futureCache = { + val builder = CacheBuilder.newBuilder() + .initialCapacity(maxSize) + .concurrencyLevel(Runtime.getRuntime.availableProcessors()) + .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS) + .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS) + .maximumSize(maxSize) + + if (useMetric && metricInterval > 0) { + val cache = builder.recordStats().build[java.lang.Long, (Long, M[A])]() + DeferCache.addScheduleJob(delay = metricInterval) { logger.metric(s"${name}: ${cache.stats()}") } + cache + } else { + builder.build[java.lang.Long, (Long, M[A])]() + } + } def asMap() = futureCache.asMap() - def getIfPresent(cacheKey: Long): Value = futureCache.getIfPresent(cacheKey) + def getIfPresent(cacheKey: Long): Value = { + val (cachedAt, promise) = futureCache.getIfPresent(cacheKey) + (cachedAt, canDefer.future(promise)) + } private def checkAndExpire(cacheKey: Long, cachedAt: Long, cacheTTL: Long, - oldDefer: Deferred[R])(op: => Deferred[R]): Deferred[R] = { + oldFuture: C[A])(op: => C[A]): C[A] = { if (System.currentTimeMillis() >= cachedAt + cacheTTL) { // future is too old. so need to expire and fetch new data from storage. futureCache.asMap().remove(cacheKey) - val newPromise = new Deferred[R]() + val promise = canDefer.promise val now = System.currentTimeMillis() - futureCache.asMap().putIfAbsent(cacheKey, (now, newPromise)) match { + futureCache.asMap().putIfAbsent(cacheKey, (now, promise)) match { case null => // only one thread succeed to come here concurrently // initiate fetch to storage then add callback on complete to finish promise. - op withCallback { value => - newPromise.callback(value) + val result = op + canDefer.onSuccess(result) { case value => + canDefer.success(promise, value) value } - newPromise - case (cachedAt, oldDefer) => oldDefer + + canDefer.onFailure(result) { case e: Throwable => + canDefer.failure(promise, e) + empty + } + + canDefer.future(promise) + + case (cachedAt, oldPromise) => canDefer.future(oldPromise) } } else { // future is not to old so reuse it. - oldDefer + oldFuture } } - def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => Deferred[R]): Deferred[R] = { + + def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => C[A]): C[A] = { val cacheVal = futureCache.getIfPresent(cacheKey) cacheVal match { case null => - val promise = new Deferred[R]() + val promise = canDefer.promise val now = System.currentTimeMillis() - val (cachedAt, defer) = futureCache.asMap().putIfAbsent(cacheKey, (now, promise)) match { + + val (cachedAt, cachedPromise) = futureCache.asMap().putIfAbsent(cacheKey, (now, promise)) match { case null => - op.withCallback { value => - promise.callback(value) + val result = op + canDefer.onSuccess(result) { case value => + canDefer.success(promise, value) value } + + canDefer.onFailure(result) { case e: Throwable => + canDefer.failure(promise, e) + empty + } + (now, promise) + case oldVal => oldVal } - checkAndExpire(cacheKey, cacheTTL, cachedAt, defer)(op) + checkAndExpire(cacheKey, cacheTTL, cachedAt, canDefer.future(cachedPromise))(op) - case (cachedAt, defer) => - checkAndExpire(cacheKey, cacheTTL, cachedAt, defer)(op) + case (cachedAt, cachedPromise) => + checkAndExpire(cacheKey, cacheTTL, cachedAt, canDefer.future(cachedPromise))(op) } } } - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/09a7919d/s2core/src/main/scala/org/apache/s2graph/core/utils/FutureCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/FutureCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/FutureCache.scala deleted file mode 100644 index d6c4de3..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/utils/FutureCache.scala +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.utils - -import java.util.concurrent.TimeUnit - -import com.google.common.cache.CacheBuilder -import com.typesafe.config.Config - -import scala.concurrent.{ExecutionContext, Future, Promise} - - -class FutureCache[R](config: Config)(implicit ex: ExecutionContext) { - - type Value = (Long, Future[R]) - - private val maxSize = config.getInt("future.cache.max.size") - private val expireAfterWrite = config.getInt("future.cache.expire.after.write") - private val expireAfterAccess = config.getInt("future.cache.expire.after.access") - - private val futureCache = CacheBuilder.newBuilder() - .initialCapacity(maxSize) - .concurrencyLevel(Runtime.getRuntime.availableProcessors()) - .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS) - .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS) - .maximumSize(maxSize).build[java.lang.Long, (Long, Promise[R])]() - - - def asMap() = futureCache.asMap() - - def getIfPresent(cacheKey: Long): Value = { - val (cachedAt, promise) = futureCache.getIfPresent(cacheKey) - (cachedAt, promise.future) - } - - private def checkAndExpire(cacheKey: Long, - cachedAt: Long, - cacheTTL: Long, - oldFuture: Future[R])(op: => Future[R]): Future[R] = { - if (System.currentTimeMillis() >= cachedAt + cacheTTL) { - // future is too old. so need to expire and fetch new data from storage. - futureCache.asMap().remove(cacheKey) - - val newPromise = Promise[R] - val now = System.currentTimeMillis() - - futureCache.asMap().putIfAbsent(cacheKey, (now, newPromise)) match { - case null => - // only one thread succeed to come here concurrently - // initiate fetch to storage then add callback on complete to finish promise. - op.onSuccess { case value => - newPromise.success(value) - value - } - newPromise.future - case (cachedAt, oldPromise) => oldPromise.future - } - } else { - // future is not to old so reuse it. - oldFuture - } - } - def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => Future[R]): Future[R] = { - val cacheVal = futureCache.getIfPresent(cacheKey) - cacheVal match { - case null => - val promise = Promise[R] - val now = System.currentTimeMillis() - val (cachedAt, cachedPromise) = futureCache.asMap().putIfAbsent(cacheKey, (now, promise)) match { - case null => - op.onSuccess { case value => - promise.success(value) - value - } - (now, promise) - case oldVal => oldVal - } - checkAndExpire(cacheKey, cacheTTL, cachedAt, cachedPromise.future)(op) - - case (cachedAt, cachedPromise) => - checkAndExpire(cacheKey, cacheTTL, cachedAt, cachedPromise.future)(op) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/09a7919d/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala index e37080e..4149540 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala @@ -50,6 +50,9 @@ object logger { private val logger = LoggerFactory.getLogger("application") private val errorLogger = LoggerFactory.getLogger("error") + private val metricLogger = LoggerFactory.getLogger("metrics") + + def metric[T: Loggable](msg: => T) = metricLogger.info(implicitly[Loggable[T]].toLogMessage(msg)) def info[T: Loggable](msg: => T) = logger.info(implicitly[Loggable[T]].toLogMessage(msg))