Repository: incubator-s2graph
Updated Branches:
  refs/heads/master fd8119bc9 -> e71264d0d


[S2GRAPH-50]: Provide new HBase Storage Schema.

  add schema_v4(Tall row hbase schema).

JIRA:
  [S2GRAPH-50] https://issues.apache.org/jira/browse/S2GRAPH-50

Pull Request:
  Closes #35


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e71264d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e71264d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e71264d0

Branch: refs/heads/master
Commit: e71264d0dc0e7ac6faaa3ceaa698e8cd04b005da
Parents: fd8119b
Author: DO YUNG YOON <[email protected]>
Authored: Mon Mar 7 11:19:59 2016 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Mon Mar 7 11:19:59 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../com/kakao/s2graph/core/QueryParam.scala     | 15 ++++
 .../com/kakao/s2graph/core/QueryResult.scala    |  1 +
 .../kakao/s2graph/core/rest/RequestParser.scala |  2 +
 .../kakao/s2graph/core/storage/Storage.scala    | 20 +++--
 .../core/storage/hbase/AsynchbaseStorage.scala  | 88 ++++++++++++++++----
 .../kakao/s2graph/core/types/HBaseType.scala    |  1 +
 .../kakao/s2graph/core/types/InnerValLike.scala | 20 ++---
 .../core/Integrate/IntegrateCommon.scala        |  2 +-
 .../core/Integrate/StrongLabelDeleteTest.scala  | 22 ++---
 .../core/Integrate/WeakLabelDeleteTest.scala    | 19 +++--
 .../s2graph/core/TestCommonWithModels.scala     | 53 +++++++++++-
 .../core/storage/hbase/IndexEdgeTest.scala      | 81 ++++++++++++++++++
 13 files changed, 271 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index a91718f..4c3eadb 100644
--- a/CHANGES
+++ b/CHANGES
@@ -16,6 +16,8 @@ Release 0.12.1 - unreleased
   
     S2GRAPH-33: Support weighted sum of multiple query results (Committed by 
DOYUNG YOON).
 
+    S2GRAPH-50: Provide new HBase Storage Schema (Committed by DOYUNG YOON).
+
   IMPROVEMENT
 
     S2GRAPH-14: Abstract HBase specific methods in Management and Label 
(Committed by DOYUNG YOON).

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
index 184cd08..0effa07 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
@@ -99,6 +99,15 @@ case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
     val hash = MurmurHash3.stringHash(templateId().toString())
     JsNumber(hash)
   }
+
+  def cursorStrings(): Seq[Seq[String]] = {
+    //Don`t know how to replace all cursor keys in json
+    steps.map { step =>
+      step.queryParams.map { queryParam =>
+        queryParam.cursorOpt.getOrElse("")
+      }
+    }
+  }
 }
 
 object EdgeTransformer {
@@ -296,6 +305,7 @@ case class QueryParam(labelWithDir: LabelWithDirection, 
timestamp: Long = System
   var exclude = false
   var include = false
   var shouldNormalize= false
+  var cursorOpt: Option[String] = None
 
   var columnRangeFilterMinBytes = Array.empty[Byte]
   var columnRangeFilterMaxBytes = Array.empty[Byte]
@@ -493,6 +503,11 @@ case class QueryParam(labelWithDir: LabelWithDirection, 
timestamp: Long = System
     this
   }
 
+  def cursorOpt(cursorOpt: Option[String]): QueryParam = {
+    this.cursorOpt = cursorOpt
+    this
+  }
+
   def isSnapshotEdge = tgtVertexInnerIdOpt.isDefined
 
   override def toString = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala
index 0ca92b5..02d9736 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala
@@ -35,6 +35,7 @@ case class QueryRequest(query: Query,
 
 
 case class QueryResult(edgeWithScoreLs: Seq[EdgeWithScore] = Nil,
+                       tailCursor: Array[Byte] = Array.empty,
                        timestamp: Long = System.currentTimeMillis(),
                        isFailure: Boolean = false)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
index 22fbd8b..f8129db 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
@@ -379,6 +379,7 @@ class RequestParser(config: Config) extends JSONParser {
       val scorePropagateOp = (labelGroup \ 
"scorePropagateOp").asOpt[String].getOrElse("multiply")
       val sample = (labelGroup \ "sample").asOpt[Int].getOrElse(-1)
       val shouldNormalize = (labelGroup \ 
"normalize").asOpt[Boolean].getOrElse(false)
+      val cursorOpt = (labelGroup \ "cursor").asOpt[String]
       // FIXME: Order of command matter
       QueryParam(labelWithDir)
         .sample(sample)
@@ -402,6 +403,7 @@ class RequestParser(config: Config) extends JSONParser {
         .transformer(transformer)
         .scorePropagateOp(scorePropagateOp)
         .shouldNormalize(shouldNormalize)
+        .cursorOpt(cursorOpt)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
index cc8f13c..8789502 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
@@ -38,6 +38,7 @@ abstract class Storage[R](val config: Config)(implicit ec: 
ExecutionContext) {
    * | v1 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do 
not use this. this exist only for backward compatibility issue |
    * | v2 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do 
not use this. this exist only for backward compatibility issue |
    * | v3 | serde.snapshotedge.tall | serde.indexedge.wide | serde.vertex | 
recommended with HBase. current stable schema |
+   * | v4 | serde.snapshotedge.tall | serde.indexedge.tall | serde.vertex | 
experimental schema. use scanner instead of get |
    *
    */
 
@@ -47,10 +48,10 @@ abstract class Storage[R](val config: Config)(implicit ec: 
ExecutionContext) {
    * @param snapshotEdge: snapshotEdge to serialize
    * @return serializer implementation for StorageSerializable which has 
toKeyValues return Seq[SKeyValue]
    */
-  def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge) = {
+  def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): 
Serializable[SnapshotEdge] = {
     snapshotEdge.schemaVer match {
       case VERSION1 | VERSION2 => new 
serde.snapshotedge.wide.SnapshotEdgeSerializable(snapshotEdge)
-      case VERSION3 => new 
serde.snapshotedge.tall.SnapshotEdgeSerializable(snapshotEdge)
+      case VERSION3 | VERSION4 => new 
serde.snapshotedge.tall.SnapshotEdgeSerializable(snapshotEdge)
       case _ => throw new RuntimeException(s"not supported version: 
${snapshotEdge.schemaVer}")
     }
   }
@@ -60,9 +61,10 @@ abstract class Storage[R](val config: Config)(implicit ec: 
ExecutionContext) {
    * @param indexEdge: indexEdge to serialize
    * @return serializer implementation
    */
-  def indexEdgeSerializer(indexEdge: IndexEdge) = {
+  def indexEdgeSerializer(indexEdge: IndexEdge): Serializable[IndexEdge] = {
     indexEdge.schemaVer match {
       case VERSION1 | VERSION2 | VERSION3 => new 
indexedge.wide.IndexEdgeSerializable(indexEdge)
+      case VERSION4 => new indexedge.tall.IndexEdgeSerializable(indexEdge)
       case _ => throw new RuntimeException(s"not supported version: 
${indexEdge.schemaVer}")
 
     }
@@ -85,19 +87,21 @@ abstract class Storage[R](val config: Config)(implicit ec: 
ExecutionContext) {
    * then that storage implementation is responsible to provide implicit type 
conversion method on CanSKeyValue.
    * */
 
-  val snapshotEdgeDeserializers = Map(
+  val snapshotEdgeDeserializers: Map[String, Deserializable[SnapshotEdge]] = 
Map(
     VERSION1 -> new snapshotedge.wide.SnapshotEdgeDeserializable,
     VERSION2 -> new snapshotedge.wide.SnapshotEdgeDeserializable,
-    VERSION3 -> new tall.SnapshotEdgeDeserializable
+    VERSION3 -> new tall.SnapshotEdgeDeserializable,
+    VERSION4 -> new tall.SnapshotEdgeDeserializable
   )
   def snapshotEdgeDeserializer(schemaVer: String) =
     snapshotEdgeDeserializers.get(schemaVer).getOrElse(throw new 
RuntimeException(s"not supported version: ${schemaVer}"))
 
   /** create deserializer that can parse stored CanSKeyValue into indexEdge. */
-  val indexEdgeDeserializers = Map(
+  val indexEdgeDeserializers: Map[String, Deserializable[IndexEdge]] = Map(
     VERSION1 -> new indexedge.wide.IndexEdgeDeserializable,
     VERSION2 -> new indexedge.wide.IndexEdgeDeserializable,
-    VERSION3 -> new indexedge.wide.IndexEdgeDeserializable
+    VERSION3 -> new indexedge.wide.IndexEdgeDeserializable,
+    VERSION4 -> new indexedge.tall.IndexEdgeDeserializable
   )
 
   def indexEdgeDeserializer(schemaVer: String) =
@@ -522,7 +526,7 @@ abstract class Storage[R](val config: Config)(implicit ec: 
ExecutionContext) {
     } yield {
         val label = queryRequest.queryParam.label
         label.schemaVersion match {
-          case HBaseType.VERSION3 =>
+          case HBaseType.VERSION3 | HBaseType.VERSION4 =>
             if (label.consistencyLevel == "strong") {
               /**
                * read: snapshotEdge on queryResult = O(N)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 7c05aed..8441c6b 100644
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -23,7 +23,7 @@ import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, ExecutionContext, Future, duration}
 import scala.util.hashing.MurmurHash3
 import java.util
-
+import java.util.Base64
 
 
 object AsynchbaseStorage {
@@ -174,21 +174,73 @@ class AsynchbaseStorage(override val config: 
Config)(implicit ec: ExecutionConte
     }
 
     val (minTs, maxTs) = queryParam.duration.getOrElse((0L, Long.MaxValue))
-    val get =
-      if (queryParam.tgtVertexInnerIdOpt.isDefined) new 
GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier)
-      else new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf)
 
-    get.maxVersions(1)
-    get.setFailfast(true)
-    get.setMaxResultsPerColumnFamily(queryParam.limit)
-    get.setRowOffsetPerColumnFamily(queryParam.offset)
-    get.setMinTimestamp(minTs)
-    get.setMaxTimestamp(maxTs)
-    get.setTimeout(queryParam.rpcTimeoutInMillis)
+    label.schemaVersion match {
+      case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty =>
+        val scanner = client.newScanner(label.hbaseTableName.getBytes)
+        scanner.setFamily(edgeCf)
+
+        /**
+         * TODO: remove this part.
+         */
+        val indexEdgeOpt = edge.edgesWithIndex.filter(edgeWithIndex => 
edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq).headOption
+        val indexEdge = indexEdgeOpt.getOrElse(throw new 
RuntimeException(s"Can`t find index for query $queryParam"))
+
+        val srcIdBytes = 
VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
+        val labelWithDirBytes = indexEdge.labelWithDir.bytes
+        val labelIndexSeqWithIsInvertedBytes = 
StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, 
isInverted = false)
+        //        val labelIndexSeqWithIsInvertedStopBytes =  
StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, 
isInverted = true)
+        val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, 
Bytes.add(labelIndexSeqWithIsInvertedBytes, Array.fill(1)(edge.op)))
+        val (startKey, stopKey) =
+          if (queryParam.columnRangeFilter != null) {
+            // interval is set.
+            val _startKey = queryParam.cursorOpt match {
+              case Some(cursor) => Bytes.add(Base64.getDecoder.decode(cursor), 
Array.fill(1)(0))
+              case None => Bytes.add(baseKey, 
queryParam.columnRangeFilterMinBytes)
+            }
+            (_startKey, Bytes.add(baseKey, 
queryParam.columnRangeFilterMaxBytes))
+          } else {
+            /**
+             * note: since propsToBytes encode size of property map at first 
byte, we are sure about max value here
+             */
+            val _startKey = queryParam.cursorOpt match {
+              case Some(cursor) => Bytes.add(Base64.getDecoder.decode(cursor), 
Array.fill(1)(0))
+              case None => baseKey
+            }
+            (_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
+          }
+//                logger.debug(s"[StartKey]: ${startKey.toList}")
+//                logger.debug(s"[StopKey]: ${stopKey.toList}")
+
+        scanner.setStartKey(startKey)
+        scanner.setStopKey(stopKey)
+
+        if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: 
$queryParam")
 
-    if (queryParam.columnRangeFilter != null) 
get.setFilter(queryParam.columnRangeFilter)
+        scanner.setMaxVersions(1)
+        scanner.setMaxNumRows(queryParam.limit)
+        scanner.setMaxTimestamp(maxTs)
+        scanner.setMinTimestamp(minTs)
+        scanner.setRpcTimeout(queryParam.rpcTimeoutInMillis)
+        // SET option for this rpc properly.
+        scanner
+      case _ =>
+        val get =
+          if (queryParam.tgtVertexInnerIdOpt.isDefined) new 
GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier)
+          else new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf)
+
+        get.maxVersions(1)
+        get.setFailfast(true)
+        get.setMaxResultsPerColumnFamily(queryParam.limit)
+        get.setRowOffsetPerColumnFamily(queryParam.offset)
+        get.setMinTimestamp(minTs)
+        get.setMaxTimestamp(maxTs)
+        get.setTimeout(queryParam.rpcTimeoutInMillis)
 
-    get
+        if (queryParam.columnRangeFilter != null) 
get.setFilter(queryParam.columnRangeFilter)
+
+        get
+    }
   }
 
   /**
@@ -212,13 +264,13 @@ class AsynchbaseStorage(override val config: 
Config)(implicit ec: ExecutionConte
         val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) {
           sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
         } else edgeWithScores
-        QueryResult(resultEdgesWithScores)
-        //        QueryRequestWithResult(queryRequest, 
QueryResult(resultEdgesWithScores, tailCursor = 
kvs.lastOption.map(_.key).getOrElse(Array.empty)))
+        QueryResult(resultEdgesWithScores, tailCursor = 
kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte]))
+//        QueryRequestWithResult(queryRequest, 
QueryResult(resultEdgesWithScores, tailCursor = 
kvs.lastOption.map(_.key).getOrElse(Array.empty)))
 
       } recoverWith { ex =>
         logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
         QueryResult(isFailure = true)
-        //        QueryRequestWithResult(queryRequest, QueryResult(isFailure = 
true))
+//        QueryRequestWithResult(queryRequest, QueryResult(isFailure = true))
       }
     }
 
@@ -232,7 +284,7 @@ class AsynchbaseStorage(override val config: 
Config)(implicit ec: ExecutionConte
         val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, 
toCacheKeyBytes(request))
         val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
         futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
-      }
+    }
     defer withCallback { queryResult => QueryRequestWithResult(queryRequest, 
queryResult)}
   }
 
@@ -478,4 +530,4 @@ class AsynchbaseStorage(override val config: 
Config)(implicit ec: ExecutionConte
   private def getEndKey(regionCount: Int): Array[Byte] = {
     Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1)))
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/main/scala/com/kakao/s2graph/core/types/HBaseType.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/types/HBaseType.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/types/HBaseType.scala
index 62af1d7..4b3b3db 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/types/HBaseType.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/types/HBaseType.scala
@@ -6,6 +6,7 @@ import org.apache.hadoop.hbase.util.Bytes
  * Created by shon on 6/6/15.
  */
 object HBaseType {
+  val VERSION4 = "v4"
   val VERSION3 = "v3"
   val VERSION2 = "v2"
   val VERSION1 = "v1"

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/main/scala/com/kakao/s2graph/core/types/InnerValLike.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/types/InnerValLike.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/types/InnerValLike.scala
index 8209462..8146f32 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/types/InnerValLike.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/types/InnerValLike.scala
@@ -66,7 +66,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
                 version: String,
                 isVertexId: Boolean): (InnerValLike, Int) = {
     version match {
-      case VERSION2 | VERSION3 => v2.InnerVal.fromBytes(bytes, offset, len, 
version, isVertexId)
+      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal.fromBytes(bytes, 
offset, len, version, isVertexId)
       case VERSION1 => v1.InnerVal.fromBytes(bytes, offset, len, version, 
isVertexId)
       case _ => throw notSupportedEx(version)
     }
@@ -74,7 +74,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
 
   def withLong(l: Long, version: String): InnerValLike = {
     version match {
-      case VERSION2 | VERSION3 => v2.InnerVal(BigDecimal(l))
+      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(l))
       case VERSION1 => v1.InnerVal(Some(l), None, None)
       case _ => throw notSupportedEx(version)
     }
@@ -82,7 +82,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
 
   def withInt(i: Int, version: String): InnerValLike = {
     version match {
-      case VERSION2 | VERSION3 => v2.InnerVal(BigDecimal(i))
+      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(i))
       case VERSION1 => v1.InnerVal(Some(i.toLong), None, None)
       case _ => throw notSupportedEx(version)
     }
@@ -90,7 +90,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
 
   def withFloat(f: Float, version: String): InnerValLike = {
     version match {
-      case VERSION2 | VERSION3 => v2.InnerVal(BigDecimal(f.toDouble))
+      case VERSION2 | VERSION3 | VERSION4 => 
v2.InnerVal(BigDecimal(f.toDouble))
       case VERSION1 => v1.InnerVal(Some(f.toLong), None, None)
       case _ => throw notSupportedEx(version)
     }
@@ -98,7 +98,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
 
   def withDouble(d: Double, version: String): InnerValLike = {
     version match {
-      case VERSION2 | VERSION3 => v2.InnerVal(BigDecimal(d))
+      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(d))
       case VERSION1 => v1.InnerVal(Some(d.toLong), None, None)
       case _ => throw notSupportedEx(version)
     }
@@ -106,7 +106,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
 
   def withNumber(num: BigDecimal, version: String): InnerValLike = {
     version match {
-      case VERSION2 | VERSION3 => v2.InnerVal(num)
+      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(num)
       case VERSION1 => v1.InnerVal(Some(num.toLong), None, None)
       case _ => throw notSupportedEx(version)
     }
@@ -114,7 +114,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
 
   def withBoolean(b: Boolean, version: String): InnerValLike = {
     version match {
-      case VERSION2 | VERSION3 => v2.InnerVal(b)
+      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(b)
       case VERSION1 => v1.InnerVal(None, None, Some(b))
       case _ => throw notSupportedEx(version)
     }
@@ -122,14 +122,14 @@ object InnerVal extends HBaseDeserializableWithIsVertexId 
{
 
   def withBlob(blob: Array[Byte], version: String): InnerValLike = {
     version match {
-      case VERSION2 | VERSION3 => v2.InnerVal(blob)
+      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(blob)
       case _ => throw notSupportedEx(version)
     }
   }
 
   def withStr(s: String, version: String): InnerValLike = {
     version match {
-      case VERSION2 | VERSION3 => v2.InnerVal(s)
+      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(s)
       case VERSION1 => v1.InnerVal(None, Some(s), None)
       case _ => throw notSupportedEx(version)
     }
@@ -147,7 +147,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
   /** nasty implementation for backward compatability */
   def convertVersion(innerVal: InnerValLike, dataType: String, toVersion: 
String): InnerValLike = {
     val ret = toVersion match {
-      case VERSION2 | VERSION3 =>
+      case VERSION2 | VERSION3 | VERSION4 =>
         if (innerVal.isInstanceOf[v1.InnerVal]) {
           val obj = innerVal.asInstanceOf[v1.InnerVal]
           obj.valueType match {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala 
b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
index ae9e514..f8bf7af 100644
--- 
a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
+++ 
b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
@@ -185,7 +185,7 @@ trait IntegrateCommon extends FunSuite with Matchers with 
BeforeAndAfterAll {
     }
     ],
     "consistencyLevel": "strong",
-    "schemaVersion": "v2",
+    "schemaVersion": "v4",
     "compressionAlgorithm": "gz",
     "hTableName": "$testHTableName"
   }"""

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala
 
b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala
index aae108e..f4da49d 100644
--- 
a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala
+++ 
b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala
@@ -198,13 +198,13 @@ class StrongLabelDeleteTest extends IntegrateCommon {
   object StrongDeleteUtil {
 
     val labelName = testLabelName2
+//    val labelName = testLabelName
     val maxTgtId = 10
     val batchSize = 10
     val testNum = 100
     val numOfBatch = 10
 
     def testInner(startTs: Long, src: Long) = {
-      val labelName = testLabelName2
       val lastOps = Array.fill(maxTgtId)("none")
       var currentTs = startTs
 
@@ -245,18 +245,18 @@ class StrongLabelDeleteTest extends IntegrateCommon {
     }
 
     def bulkEdges(startTs: Int = 0) = Seq(
-      toEdge(startTs + 1, "insert", "e", "0", "1", testLabelName2, 
s"""{"time": 10}"""),
-      toEdge(startTs + 2, "insert", "e", "0", "1", testLabelName2, 
s"""{"time": 11}"""),
-      toEdge(startTs + 3, "insert", "e", "0", "1", testLabelName2, 
s"""{"time": 12}"""),
-      toEdge(startTs + 4, "insert", "e", "0", "2", testLabelName2, 
s"""{"time": 10}"""),
-      toEdge(startTs + 5, "insert", "e", "10", "20", testLabelName2, 
s"""{"time": 10}"""),
-      toEdge(startTs + 6, "insert", "e", "10", "21", testLabelName2, 
s"""{"time": 11}"""),
-      toEdge(startTs + 7, "insert", "e", "11", "20", testLabelName2, 
s"""{"time": 12}"""),
-      toEdge(startTs + 8, "insert", "e", "12", "20", testLabelName2, 
s"""{"time": 13}""")
+      toEdge(startTs + 1, "insert", "e", "0", "1", labelName, s"""{"time": 
10}"""),
+      toEdge(startTs + 2, "insert", "e", "0", "1", labelName, s"""{"time": 
11}"""),
+      toEdge(startTs + 3, "insert", "e", "0", "1", labelName, s"""{"time": 
12}"""),
+      toEdge(startTs + 4, "insert", "e", "0", "2", labelName, s"""{"time": 
10}"""),
+      toEdge(startTs + 5, "insert", "e", "10", "20", labelName, s"""{"time": 
10}"""),
+      toEdge(startTs + 6, "insert", "e", "10", "21", labelName, s"""{"time": 
11}"""),
+      toEdge(startTs + 7, "insert", "e", "11", "20", labelName, s"""{"time": 
12}"""),
+      toEdge(startTs + 8, "insert", "e", "12", "20", labelName, s"""{"time": 
13}""")
     )
 
     def query(id: Long, serviceName: String = testServiceName, columnName: 
String = testColumnName,
-              labelName: String = testLabelName2, direction: String = "out") = 
Json.parse(
+              _labelName: String = labelName, direction: String = "out") = 
Json.parse(
       s"""
           { "srcVertices": [
             { "serviceName": "$serviceName",
@@ -265,7 +265,7 @@ class StrongLabelDeleteTest extends IntegrateCommon {
              }],
             "steps": [
             [ {
-                "label": "$labelName",
+                "label": "${_labelName}",
                 "direction": "${direction}",
                 "offset": 0,
                 "limit": -1,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala
 
b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala
index b80d9c7..2028c44 100644
--- 
a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala
+++ 
b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala
@@ -39,12 +39,19 @@ class WeakLabelDeleteTest extends IntegrateCommon with 
BeforeAndAfterEach {
     (result \ "results").as[List[JsValue]].size should be(0)
 
     /** insert should be ignored */
-    val edgesToStore2 = parser.toEdges(Json.toJson(edges), "insert")
-    val rets2 = graph.mutateEdges(edgesToStore2, withWait = true)
-    Await.result(rets2, Duration(20, TimeUnit.MINUTES))
-
-    result = getEdgesSync(query(0))
-    (result \ "results").as[List[JsValue]].size should be(0)
+    /**
+     * I am wondering if this is right test case
+     * This makes sense because hbase think cell is deleted when there are
+     * insert/delete with same timestamp(version) on same cell.
+     * This can be different on different storage system so I think
+     * this test should be removed.
+     */
+//    val edgesToStore2 = parser.toEdges(Json.toJson(edges), "insert")
+//    val rets2 = graph.mutateEdges(edgesToStore2, withWait = true)
+//    Await.result(rets2, Duration(20, TimeUnit.MINUTES))
+//
+//    result = getEdgesSync(query(0))
+//    (result \ "results").as[List[JsValue]].size should be(0)
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala 
b/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala
index 4fa7b59..f7c4bc1 100644
--- a/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala
+++ b/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala
@@ -42,20 +42,36 @@ trait TestCommonWithModels {
 
   val serviceName = "_test_service"
   val serviceNameV2 = "_test_service_v2"
+  val serviceNameV3 = "_test_service_v3"
+  val serviceNameV4 = "_test_service_v4"
+
   val columnName = "user_id"
   val columnNameV2 = "user_id_v2"
+  val columnNameV3 = "user_id_v3"
+  val columnNameV4 = "user_id_v4"
+
   val columnType = "long"
   val columnTypeV2 = "long"
+  val columnTypeV3 = "long"
+  val columnTypeV4 = "long"
 
   val tgtColumnName = "itme_id"
   val tgtColumnNameV2 = "item_id_v2"
+  val tgtColumnNameV3 = "item_id_v3"
+  val tgtColumnNameV4 = "item_id_v4"
+
   val tgtColumnType = "string"
   val tgtColumnTypeV2 = "string"
+  val tgtColumnTypeV3 = "string"
+  val tgtColumnTypeV4 = "string"
 
   val hTableName = "_test_cases"
   val preSplitSize = 0
+
   val labelName = "_test_label"
   val labelNameV2 = "_test_label_v2"
+  val labelNameV3 = "_test_label_v3"
+  val labelNameV4 = "_test_label_v4"
 
   val undirectedLabelName = "_test_label_undirected"
   val undirectedLabelNameV2 = "_test_label_undirected_v2"
@@ -79,12 +95,16 @@ trait TestCommonWithModels {
     implicit val session = AutoSession
     management.createService(serviceName, cluster, hTableName, preSplitSize, 
hTableTTL = None, "gz")
     management.createService(serviceNameV2, cluster, hTableName, preSplitSize, 
hTableTTL = None, "gz")
+    management.createService(serviceNameV3, cluster, hTableName, preSplitSize, 
hTableTTL = None, "gz")
+    management.createService(serviceNameV4, cluster, hTableName, preSplitSize, 
hTableTTL = None, "gz")
   }
 
   def deleteTestService() = {
     implicit val session = AutoSession
     Management.deleteService(serviceName)
     Management.deleteService(serviceNameV2)
+    Management.deleteService(serviceNameV3)
+    Management.deleteService(serviceNameV4)
   }
 
   def deleteTestLabel() = {
@@ -103,8 +123,14 @@ trait TestCommonWithModels {
     management.createLabel(labelNameV2, serviceNameV2, columnNameV2, 
columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
       isDirected = true, serviceNameV2, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4")
 
+    management.createLabel(labelNameV3, serviceNameV3, columnNameV3, 
columnTypeV3, serviceNameV3, tgtColumnNameV3, tgtColumnTypeV3,
+      isDirected = true, serviceNameV3, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4")
+
+    management.createLabel(labelNameV4, serviceNameV4, columnNameV4, 
columnTypeV4, serviceNameV4, tgtColumnNameV4, tgtColumnTypeV4,
+      isDirected = true, serviceNameV4, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION4, false, "lg4")
+
     management.createLabel(undirectedLabelName, serviceName, columnName, 
columnType, serviceName, tgtColumnName, tgtColumnType,
-      isDirected = false, serviceName, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4")
+      isDirected = false, serviceName, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4")
 
     management.createLabel(undirectedLabelNameV2, serviceNameV2, columnNameV2, 
columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
       isDirected = false, serviceName, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4")
@@ -114,18 +140,34 @@ trait TestCommonWithModels {
 
   def serviceV2 = Service.findByName(serviceNameV2, useCache = false).get
 
+  def serviceV3 = Service.findByName(serviceNameV3, useCache = false).get
+
+  def serviceV4 = Service.findByName(serviceNameV4, useCache = false).get
+
   def column = ServiceColumn.find(service.id.get, columnName, useCache = 
false).get
 
   def columnV2 = ServiceColumn.find(serviceV2.id.get, columnNameV2, useCache = 
false).get
 
+  def columnV3 = ServiceColumn.find(serviceV3.id.get, columnNameV3, useCache = 
false).get
+
+  def columnV4 = ServiceColumn.find(serviceV4.id.get, columnNameV4, useCache = 
false).get
+
   def tgtColumn = ServiceColumn.find(service.id.get, tgtColumnName, useCache = 
false).get
 
   def tgtColumnV2 = ServiceColumn.find(serviceV2.id.get, tgtColumnNameV2, 
useCache = false).get
 
+  def tgtColumnV3 = ServiceColumn.find(serviceV3.id.get, tgtColumnNameV3, 
useCache = false).get
+
+  def tgtColumnV4 = ServiceColumn.find(serviceV4.id.get, tgtColumnNameV4, 
useCache = false).get
+
   def label = Label.findByName(labelName, useCache = false).get
 
   def labelV2 = Label.findByName(labelNameV2, useCache = false).get
 
+  def labelV3 = Label.findByName(labelNameV3, useCache = false).get
+
+  def labelV4 = Label.findByName(labelNameV4, useCache = false).get
+
   def undirectedLabel = Label.findByName(undirectedLabelName, useCache = 
false).get
 
   def undirectedLabelV2 = Label.findByName(undirectedLabelNameV2, useCache = 
false).get
@@ -140,7 +182,16 @@ trait TestCommonWithModels {
 
   def labelWithDirV2 = LabelWithDirection(labelV2.id.get, dir)
 
+  def labelWithDirV3 = LabelWithDirection(labelV3.id.get, dir)
+
+  def labelWithDirV4 = LabelWithDirection(labelV4.id.get, dir)
+
   def queryParam = QueryParam(labelWithDir)
 
   def queryParamV2 = QueryParam(labelWithDirV2)
+
+  def queryParamV3 = QueryParam(labelWithDirV3)
+
+  def queryParamV4 = QueryParam(labelWithDirV4)
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeTest.scala
 
b/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeTest.scala
new file mode 100644
index 0000000..e68fc20
--- /dev/null
+++ 
b/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeTest.scala
@@ -0,0 +1,81 @@
+package com.kakao.s2graph.core.storage.hbase
+
+import com.kakao.s2graph.core.mysqls.{Label, LabelMeta, LabelIndex}
+import com.kakao.s2graph.core.{IndexEdge, Vertex, TestCommonWithModels}
+import com.kakao.s2graph.core.types._
+import org.scalatest.{FunSuite, Matchers}
+
+
+class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels {
+  initTests()
+
+  /**
+   * check if storage serializer/deserializer can translate from/to bytes 
array.
+   * @param l: label for edge.
+   * @param ts: timestamp for edge.
+   * @param to: to VertexId for edge.
+   * @param props: expected props of edge.
+   */
+  def check(l: Label, ts: Long, to: InnerValLike, props: Map[Byte, 
InnerValLike]): Unit = {
+    val from = InnerVal.withLong(1, l.schemaVersion)
+    val vertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, from)
+    val tgtVertexId = TargetVertexId(HBaseType.DEFAULT_COL_ID, to)
+    val vertex = Vertex(vertexId, ts)
+    val tgtVertex = Vertex(tgtVertexId, ts)
+    val labelWithDir = LabelWithDirection(l.id.get, 0)
+
+    val indexEdge = IndexEdge(vertex, tgtVertex, labelWithDir, 0, ts, 
LabelIndex.DefaultSeq, props)
+    val _indexEdgeOpt = 
graph.storage.indexEdgeDeserializer(l.schemaVersion).fromKeyValues(queryParam,
+      graph.storage.indexEdgeSerializer(indexEdge).toKeyValues, 
l.schemaVersion, None)
+
+    _indexEdgeOpt should not be empty
+    indexEdge should be(_indexEdgeOpt.get)
+  }
+
+
+  /** note that props have to be properly set up for equals */
+  test("test serializer/deserializer for index edge.") {
+    val ts = System.currentTimeMillis()
+    for {
+      l <- Seq(label, labelV2, labelV3, labelV4)
+    } {
+      val to = InnerVal.withLong(101, l.schemaVersion)
+      val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion)
+      val props = Map(LabelMeta.timeStampSeq -> tsInnerVal,
+        1.toByte -> InnerVal.withDouble(2.1, l.schemaVersion))
+
+      check(l, ts, to, props)
+    }
+  }
+
+  test("test serializer/deserializer for degree edge.") {
+    val ts = System.currentTimeMillis()
+    for {
+      l <- Seq(label, labelV2, labelV3, labelV4)
+    } {
+      val to = InnerVal.withStr("0", l.schemaVersion)
+      val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion)
+      val props = Map(
+        LabelMeta.degreeSeq -> InnerVal.withLong(10, l.schemaVersion),
+        LabelMeta.timeStampSeq -> tsInnerVal)
+
+      check(l, ts, to, props)
+    }
+  }
+
+  test("test serializer/deserializer for incrementCount index edge.") {
+    val ts = System.currentTimeMillis()
+    for {
+      l <- Seq(label, labelV2, labelV3, labelV4)
+    } {
+      val to = InnerVal.withLong(101, l.schemaVersion)
+
+      val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion)
+      val props = Map(LabelMeta.timeStampSeq -> tsInnerVal,
+        1.toByte -> InnerVal.withDouble(2.1, l.schemaVersion),
+        LabelMeta.countSeq -> InnerVal.withLong(10, l.schemaVersion))
+
+      check(l, ts, to, props)
+    }
+  }
+}
\ No newline at end of file


Reply via email to