Repository: incubator-s2graph
Updated Branches:
  refs/heads/master a8539c2b6 -> a1d91ee02


[S2GRAPH-65]: Deferred produce exception.
  fix type cast bug.

JIRA:
  [S2GRAPH-65] https://issues.apache.org/jira/browse/S2GRAPH-65

Pull Request:
  Closes #59

Authors:
  DOYUNG YOON: steams...@apache.org


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/a1d91ee0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/a1d91ee0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/a1d91ee0

Branch: refs/heads/master
Commit: a1d91ee02211cbb3e9ef9512846492b060d29ba5
Parents: a8539c2
Author: DO YUNG YOON <steams...@apache.org>
Authored: Fri Jun 17 09:51:56 2016 +0900
Committer: DO YUNG YOON <steams...@apache.org>
Committed: Fri Jun 17 09:51:56 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 ++
 .../core/storage/hbase/AsynchbaseStorage.scala  | 25 ++++++++++----------
 2 files changed, 14 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a1d91ee0/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index c8cf738..436cca4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -94,6 +94,8 @@ Release 0.12.1 - unreleased
     S2GRAPH-19: When query with duration error (Committed by DOYUNG YOON).
     
     S2GRAPH-63: Condition on partition strong edges and weak edges on 
mutateEdges is wrong (Committed by DOYUNG YOON).
+   
+    S2GRAPH-65: Deferred produce exception (Committed by DOYUNG YOON).
 
     S2GRAPH-64: incrementCounts yield type case exception (Committed by DOYUNG 
YOON).
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a1d91ee0/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index b5d94e6..c0c369b 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -99,7 +99,7 @@ class AsynchbaseStorage(override val config: Config)(implicit 
ec: ExecutionConte
   private def client(withWait: Boolean): HBaseClient = if (withWait) 
clientWithFlush else client
 
   /** Future Cache to squash request */
-  private val futureCache = new DeferCache[QueryResult](config)(ec)
+  private val futureCache = new DeferCache[QueryRequestWithResult](config)(ec)
 
   /** Simple Vertex Cache */
   private val vertexCache = new FutureCache[Seq[SKeyValue]](config)(ec)
@@ -277,19 +277,19 @@ class AsynchbaseStorage(override val config: 
Config)(implicit ec: ExecutionConte
                      isInnerCall: Boolean,
                      parentEdges: Seq[EdgeWithScore]): 
Deferred[QueryRequestWithResult] = {
 
-    def fetchInner(hbaseRpc: AnyRef): Deferred[QueryResult] = {
+    def fetchInner(hbaseRpc: AnyRef): Deferred[QueryRequestWithResult] = {
       fetchKeyValuesInner(hbaseRpc).withCallback { kvs =>
         val edgeWithScores = toEdges(kvs, queryRequest.queryParam, 
prevStepScore, isInnerCall, parentEdges)
         val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) {
           sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
         } else edgeWithScores
-        QueryResult(resultEdgesWithScores, tailCursor = 
kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte]))
-//        QueryRequestWithResult(queryRequest, 
QueryResult(resultEdgesWithScores, tailCursor = 
kvs.lastOption.map(_.key).getOrElse(Array.empty)))
+//        QueryResult(resultEdgesWithScores, tailCursor = 
kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte]))
+        QueryRequestWithResult(queryRequest, 
QueryResult(resultEdgesWithScores, tailCursor = 
kvs.lastOption.map(_.key).getOrElse(Array.empty)))
 
       } recoverWith { ex =>
         logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
-        QueryResult(isFailure = true)
-//        QueryRequestWithResult(queryRequest, QueryResult(isFailure = true))
+//        QueryResult(isFailure = true)
+        QueryRequestWithResult(queryRequest, QueryResult(isFailure = true))
       }
     }
 
@@ -297,14 +297,13 @@ class AsynchbaseStorage(override val config: 
Config)(implicit ec: ExecutionConte
     val cacheTTL = queryParam.cacheTTLInMillis
     val request = buildRequest(queryRequest)
 
-    val defer =
-      if (cacheTTL <= 0) fetchInner(request)
-      else {
-        val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, 
toCacheKeyBytes(request))
-        val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
-        futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
+
+    if (cacheTTL <= 0) fetchInner(request)
+    else {
+      val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, 
toCacheKeyBytes(request))
+      val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
+      futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
     }
-    defer withCallback { queryResult => QueryRequestWithResult(queryRequest, 
queryResult)}
   }
 
 

Reply via email to