Repository: incubator-s2graph Updated Branches: refs/heads/master a8539c2b6 -> a1d91ee02
[S2GRAPH-65]: Deferred produce exception. fix type cast bug. JIRA: [S2GRAPH-65] https://issues.apache.org/jira/browse/S2GRAPH-65 Pull Request: Closes #59 Authors: DOYUNG YOON: steams...@apache.org Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/a1d91ee0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/a1d91ee0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/a1d91ee0 Branch: refs/heads/master Commit: a1d91ee02211cbb3e9ef9512846492b060d29ba5 Parents: a8539c2 Author: DO YUNG YOON <steams...@apache.org> Authored: Fri Jun 17 09:51:56 2016 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Fri Jun 17 09:51:56 2016 +0900 ---------------------------------------------------------------------- CHANGES | 2 ++ .../core/storage/hbase/AsynchbaseStorage.scala | 25 ++++++++++---------- 2 files changed, 14 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a1d91ee0/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index c8cf738..436cca4 100644 --- a/CHANGES +++ b/CHANGES @@ -94,6 +94,8 @@ Release 0.12.1 - unreleased S2GRAPH-19: When query with duration error (Committed by DOYUNG YOON). S2GRAPH-63: Condition on partition strong edges and weak edges on mutateEdges is wrong (Committed by DOYUNG YOON). + + S2GRAPH-65: Deferred produce exception (Committed by DOYUNG YOON). S2GRAPH-64: incrementCounts yield type case exception (Committed by DOYUNG YOON). http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a1d91ee0/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index b5d94e6..c0c369b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -99,7 +99,7 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client /** Future Cache to squash request */ - private val futureCache = new DeferCache[QueryResult](config)(ec) + private val futureCache = new DeferCache[QueryRequestWithResult](config)(ec) /** Simple Vertex Cache */ private val vertexCache = new FutureCache[Seq[SKeyValue]](config)(ec) @@ -277,19 +277,19 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte isInnerCall: Boolean, parentEdges: Seq[EdgeWithScore]): Deferred[QueryRequestWithResult] = { - def fetchInner(hbaseRpc: AnyRef): Deferred[QueryResult] = { + def fetchInner(hbaseRpc: AnyRef): Deferred[QueryRequestWithResult] = { fetchKeyValuesInner(hbaseRpc).withCallback { kvs => val edgeWithScores = toEdges(kvs, queryRequest.queryParam, prevStepScore, isInnerCall, parentEdges) val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) { sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample) } else edgeWithScores - QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte])) -// QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty))) +// QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte])) + QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty))) } recoverWith { ex => logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex) - QueryResult(isFailure = true) -// QueryRequestWithResult(queryRequest, QueryResult(isFailure = true)) +// QueryResult(isFailure = true) + QueryRequestWithResult(queryRequest, QueryResult(isFailure = true)) } } @@ -297,14 +297,13 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte val cacheTTL = queryParam.cacheTTLInMillis val request = buildRequest(queryRequest) - val defer = - if (cacheTTL <= 0) fetchInner(request) - else { - val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request)) - val cacheKey = queryParam.toCacheKey(cacheKeyBytes) - futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request)) + + if (cacheTTL <= 0) fetchInner(request) + else { + val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request)) + val cacheKey = queryParam.toCacheKey(cacheKeyBytes) + futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request)) } - defer withCallback { queryResult => QueryRequestWithResult(queryRequest, queryResult)} }