Repository: incubator-s2graph Updated Branches: refs/heads/master fd8119bc9 -> e71264d0d
[S2GRAPH-50]: Provide new HBase Storage Schema. add schema_v4(Tall row hbase schema). JIRA: [S2GRAPH-50] https://issues.apache.org/jira/browse/S2GRAPH-50 Pull Request: Closes #35 Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e71264d0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e71264d0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e71264d0 Branch: refs/heads/master Commit: e71264d0dc0e7ac6faaa3ceaa698e8cd04b005da Parents: fd8119b Author: DO YUNG YOON <[email protected]> Authored: Mon Mar 7 11:19:59 2016 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Mon Mar 7 11:19:59 2016 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../com/kakao/s2graph/core/QueryParam.scala | 15 ++++ .../com/kakao/s2graph/core/QueryResult.scala | 1 + .../kakao/s2graph/core/rest/RequestParser.scala | 2 + .../kakao/s2graph/core/storage/Storage.scala | 20 +++-- .../core/storage/hbase/AsynchbaseStorage.scala | 88 ++++++++++++++++---- .../kakao/s2graph/core/types/HBaseType.scala | 1 + .../kakao/s2graph/core/types/InnerValLike.scala | 20 ++--- .../core/Integrate/IntegrateCommon.scala | 2 +- .../core/Integrate/StrongLabelDeleteTest.scala | 22 ++--- .../core/Integrate/WeakLabelDeleteTest.scala | 19 +++-- .../s2graph/core/TestCommonWithModels.scala | 53 +++++++++++- .../core/storage/hbase/IndexEdgeTest.scala | 81 ++++++++++++++++++ 13 files changed, 271 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index a91718f..4c3eadb 100644 --- a/CHANGES +++ b/CHANGES @@ -16,6 +16,8 @@ Release 0.12.1 - unreleased S2GRAPH-33: Support weighted sum of multiple query results (Committed by DOYUNG YOON). + S2GRAPH-50: Provide new HBase Storage Schema (Committed by DOYUNG YOON). + IMPROVEMENT S2GRAPH-14: Abstract HBase specific methods in Management and Label (Committed by DOYUNG YOON). http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala index 184cd08..0effa07 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala @@ -99,6 +99,15 @@ case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex], val hash = MurmurHash3.stringHash(templateId().toString()) JsNumber(hash) } + + def cursorStrings(): Seq[Seq[String]] = { + //Don`t know how to replace all cursor keys in json + steps.map { step => + step.queryParams.map { queryParam => + queryParam.cursorOpt.getOrElse("") + } + } + } } object EdgeTransformer { @@ -296,6 +305,7 @@ case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System var exclude = false var include = false var shouldNormalize= false + var cursorOpt: Option[String] = None var columnRangeFilterMinBytes = Array.empty[Byte] var columnRangeFilterMaxBytes = Array.empty[Byte] @@ -493,6 +503,11 @@ case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System this } + def cursorOpt(cursorOpt: Option[String]): QueryParam = { + this.cursorOpt = cursorOpt + this + } + def isSnapshotEdge = tgtVertexInnerIdOpt.isDefined override def toString = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala b/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala index 0ca92b5..02d9736 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala @@ -35,6 +35,7 @@ case class QueryRequest(query: Query, case class QueryResult(edgeWithScoreLs: Seq[EdgeWithScore] = Nil, + tailCursor: Array[Byte] = Array.empty, timestamp: Long = System.currentTimeMillis(), isFailure: Boolean = false) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala index 22fbd8b..f8129db 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala @@ -379,6 +379,7 @@ class RequestParser(config: Config) extends JSONParser { val scorePropagateOp = (labelGroup \ "scorePropagateOp").asOpt[String].getOrElse("multiply") val sample = (labelGroup \ "sample").asOpt[Int].getOrElse(-1) val shouldNormalize = (labelGroup \ "normalize").asOpt[Boolean].getOrElse(false) + val cursorOpt = (labelGroup \ "cursor").asOpt[String] // FIXME: Order of command matter QueryParam(labelWithDir) .sample(sample) @@ -402,6 +403,7 @@ class RequestParser(config: Config) extends JSONParser { .transformer(transformer) .scorePropagateOp(scorePropagateOp) .shouldNormalize(shouldNormalize) + .cursorOpt(cursorOpt) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala index cc8f13c..8789502 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala @@ -38,6 +38,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { * | v1 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do not use this. this exist only for backward compatibility issue | * | v2 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do not use this. this exist only for backward compatibility issue | * | v3 | serde.snapshotedge.tall | serde.indexedge.wide | serde.vertex | recommended with HBase. current stable schema | + * | v4 | serde.snapshotedge.tall | serde.indexedge.tall | serde.vertex | experimental schema. use scanner instead of get | * */ @@ -47,10 +48,10 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { * @param snapshotEdge: snapshotEdge to serialize * @return serializer implementation for StorageSerializable which has toKeyValues return Seq[SKeyValue] */ - def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge) = { + def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): Serializable[SnapshotEdge] = { snapshotEdge.schemaVer match { case VERSION1 | VERSION2 => new serde.snapshotedge.wide.SnapshotEdgeSerializable(snapshotEdge) - case VERSION3 => new serde.snapshotedge.tall.SnapshotEdgeSerializable(snapshotEdge) + case VERSION3 | VERSION4 => new serde.snapshotedge.tall.SnapshotEdgeSerializable(snapshotEdge) case _ => throw new RuntimeException(s"not supported version: ${snapshotEdge.schemaVer}") } } @@ -60,9 +61,10 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { * @param indexEdge: indexEdge to serialize * @return serializer implementation */ - def indexEdgeSerializer(indexEdge: IndexEdge) = { + def indexEdgeSerializer(indexEdge: IndexEdge): Serializable[IndexEdge] = { indexEdge.schemaVer match { case VERSION1 | VERSION2 | VERSION3 => new indexedge.wide.IndexEdgeSerializable(indexEdge) + case VERSION4 => new indexedge.tall.IndexEdgeSerializable(indexEdge) case _ => throw new RuntimeException(s"not supported version: ${indexEdge.schemaVer}") } @@ -85,19 +87,21 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { * then that storage implementation is responsible to provide implicit type conversion method on CanSKeyValue. * */ - val snapshotEdgeDeserializers = Map( + val snapshotEdgeDeserializers: Map[String, Deserializable[SnapshotEdge]] = Map( VERSION1 -> new snapshotedge.wide.SnapshotEdgeDeserializable, VERSION2 -> new snapshotedge.wide.SnapshotEdgeDeserializable, - VERSION3 -> new tall.SnapshotEdgeDeserializable + VERSION3 -> new tall.SnapshotEdgeDeserializable, + VERSION4 -> new tall.SnapshotEdgeDeserializable ) def snapshotEdgeDeserializer(schemaVer: String) = snapshotEdgeDeserializers.get(schemaVer).getOrElse(throw new RuntimeException(s"not supported version: ${schemaVer}")) /** create deserializer that can parse stored CanSKeyValue into indexEdge. */ - val indexEdgeDeserializers = Map( + val indexEdgeDeserializers: Map[String, Deserializable[IndexEdge]] = Map( VERSION1 -> new indexedge.wide.IndexEdgeDeserializable, VERSION2 -> new indexedge.wide.IndexEdgeDeserializable, - VERSION3 -> new indexedge.wide.IndexEdgeDeserializable + VERSION3 -> new indexedge.wide.IndexEdgeDeserializable, + VERSION4 -> new indexedge.tall.IndexEdgeDeserializable ) def indexEdgeDeserializer(schemaVer: String) = @@ -522,7 +526,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { } yield { val label = queryRequest.queryParam.label label.schemaVersion match { - case HBaseType.VERSION3 => + case HBaseType.VERSION3 | HBaseType.VERSION4 => if (label.consistencyLevel == "strong") { /** * read: snapshotEdge on queryResult = O(N) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala index 7c05aed..8441c6b 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -23,7 +23,7 @@ import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, Future, duration} import scala.util.hashing.MurmurHash3 import java.util - +import java.util.Base64 object AsynchbaseStorage { @@ -174,21 +174,73 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte } val (minTs, maxTs) = queryParam.duration.getOrElse((0L, Long.MaxValue)) - val get = - if (queryParam.tgtVertexInnerIdOpt.isDefined) new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier) - else new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf) - get.maxVersions(1) - get.setFailfast(true) - get.setMaxResultsPerColumnFamily(queryParam.limit) - get.setRowOffsetPerColumnFamily(queryParam.offset) - get.setMinTimestamp(minTs) - get.setMaxTimestamp(maxTs) - get.setTimeout(queryParam.rpcTimeoutInMillis) + label.schemaVersion match { + case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty => + val scanner = client.newScanner(label.hbaseTableName.getBytes) + scanner.setFamily(edgeCf) + + /** + * TODO: remove this part. + */ + val indexEdgeOpt = edge.edgesWithIndex.filter(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq).headOption + val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam")) + + val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes + val labelWithDirBytes = indexEdge.labelWithDir.bytes + val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) + // val labelIndexSeqWithIsInvertedStopBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = true) + val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, Bytes.add(labelIndexSeqWithIsInvertedBytes, Array.fill(1)(edge.op))) + val (startKey, stopKey) = + if (queryParam.columnRangeFilter != null) { + // interval is set. + val _startKey = queryParam.cursorOpt match { + case Some(cursor) => Bytes.add(Base64.getDecoder.decode(cursor), Array.fill(1)(0)) + case None => Bytes.add(baseKey, queryParam.columnRangeFilterMinBytes) + } + (_startKey, Bytes.add(baseKey, queryParam.columnRangeFilterMaxBytes)) + } else { + /** + * note: since propsToBytes encode size of property map at first byte, we are sure about max value here + */ + val _startKey = queryParam.cursorOpt match { + case Some(cursor) => Bytes.add(Base64.getDecoder.decode(cursor), Array.fill(1)(0)) + case None => baseKey + } + (_startKey, Bytes.add(baseKey, Array.fill(1)(-1))) + } +// logger.debug(s"[StartKey]: ${startKey.toList}") +// logger.debug(s"[StopKey]: ${stopKey.toList}") + + scanner.setStartKey(startKey) + scanner.setStopKey(stopKey) + + if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam") - if (queryParam.columnRangeFilter != null) get.setFilter(queryParam.columnRangeFilter) + scanner.setMaxVersions(1) + scanner.setMaxNumRows(queryParam.limit) + scanner.setMaxTimestamp(maxTs) + scanner.setMinTimestamp(minTs) + scanner.setRpcTimeout(queryParam.rpcTimeoutInMillis) + // SET option for this rpc properly. + scanner + case _ => + val get = + if (queryParam.tgtVertexInnerIdOpt.isDefined) new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier) + else new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf) + + get.maxVersions(1) + get.setFailfast(true) + get.setMaxResultsPerColumnFamily(queryParam.limit) + get.setRowOffsetPerColumnFamily(queryParam.offset) + get.setMinTimestamp(minTs) + get.setMaxTimestamp(maxTs) + get.setTimeout(queryParam.rpcTimeoutInMillis) - get + if (queryParam.columnRangeFilter != null) get.setFilter(queryParam.columnRangeFilter) + + get + } } /** @@ -212,13 +264,13 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) { sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample) } else edgeWithScores - QueryResult(resultEdgesWithScores) - // QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty))) + QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte])) +// QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty))) } recoverWith { ex => logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex) QueryResult(isFailure = true) - // QueryRequestWithResult(queryRequest, QueryResult(isFailure = true)) +// QueryRequestWithResult(queryRequest, QueryResult(isFailure = true)) } } @@ -232,7 +284,7 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request)) val cacheKey = queryParam.toCacheKey(cacheKeyBytes) futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request)) - } + } defer withCallback { queryResult => QueryRequestWithResult(queryRequest, queryResult)} } @@ -478,4 +530,4 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte private def getEndKey(regionCount: Int): Array[Byte] = { Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1))) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/main/scala/com/kakao/s2graph/core/types/HBaseType.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/types/HBaseType.scala b/s2core/src/main/scala/com/kakao/s2graph/core/types/HBaseType.scala index 62af1d7..4b3b3db 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/types/HBaseType.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/types/HBaseType.scala @@ -6,6 +6,7 @@ import org.apache.hadoop.hbase.util.Bytes * Created by shon on 6/6/15. */ object HBaseType { + val VERSION4 = "v4" val VERSION3 = "v3" val VERSION2 = "v2" val VERSION1 = "v1" http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/main/scala/com/kakao/s2graph/core/types/InnerValLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/types/InnerValLike.scala b/s2core/src/main/scala/com/kakao/s2graph/core/types/InnerValLike.scala index 8209462..8146f32 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/types/InnerValLike.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/types/InnerValLike.scala @@ -66,7 +66,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId { version: String, isVertexId: Boolean): (InnerValLike, Int) = { version match { - case VERSION2 | VERSION3 => v2.InnerVal.fromBytes(bytes, offset, len, version, isVertexId) + case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal.fromBytes(bytes, offset, len, version, isVertexId) case VERSION1 => v1.InnerVal.fromBytes(bytes, offset, len, version, isVertexId) case _ => throw notSupportedEx(version) } @@ -74,7 +74,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId { def withLong(l: Long, version: String): InnerValLike = { version match { - case VERSION2 | VERSION3 => v2.InnerVal(BigDecimal(l)) + case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(l)) case VERSION1 => v1.InnerVal(Some(l), None, None) case _ => throw notSupportedEx(version) } @@ -82,7 +82,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId { def withInt(i: Int, version: String): InnerValLike = { version match { - case VERSION2 | VERSION3 => v2.InnerVal(BigDecimal(i)) + case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(i)) case VERSION1 => v1.InnerVal(Some(i.toLong), None, None) case _ => throw notSupportedEx(version) } @@ -90,7 +90,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId { def withFloat(f: Float, version: String): InnerValLike = { version match { - case VERSION2 | VERSION3 => v2.InnerVal(BigDecimal(f.toDouble)) + case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(f.toDouble)) case VERSION1 => v1.InnerVal(Some(f.toLong), None, None) case _ => throw notSupportedEx(version) } @@ -98,7 +98,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId { def withDouble(d: Double, version: String): InnerValLike = { version match { - case VERSION2 | VERSION3 => v2.InnerVal(BigDecimal(d)) + case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(d)) case VERSION1 => v1.InnerVal(Some(d.toLong), None, None) case _ => throw notSupportedEx(version) } @@ -106,7 +106,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId { def withNumber(num: BigDecimal, version: String): InnerValLike = { version match { - case VERSION2 | VERSION3 => v2.InnerVal(num) + case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(num) case VERSION1 => v1.InnerVal(Some(num.toLong), None, None) case _ => throw notSupportedEx(version) } @@ -114,7 +114,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId { def withBoolean(b: Boolean, version: String): InnerValLike = { version match { - case VERSION2 | VERSION3 => v2.InnerVal(b) + case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(b) case VERSION1 => v1.InnerVal(None, None, Some(b)) case _ => throw notSupportedEx(version) } @@ -122,14 +122,14 @@ object InnerVal extends HBaseDeserializableWithIsVertexId { def withBlob(blob: Array[Byte], version: String): InnerValLike = { version match { - case VERSION2 | VERSION3 => v2.InnerVal(blob) + case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(blob) case _ => throw notSupportedEx(version) } } def withStr(s: String, version: String): InnerValLike = { version match { - case VERSION2 | VERSION3 => v2.InnerVal(s) + case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(s) case VERSION1 => v1.InnerVal(None, Some(s), None) case _ => throw notSupportedEx(version) } @@ -147,7 +147,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId { /** nasty implementation for backward compatability */ def convertVersion(innerVal: InnerValLike, dataType: String, toVersion: String): InnerValLike = { val ret = toVersion match { - case VERSION2 | VERSION3 => + case VERSION2 | VERSION3 | VERSION4 => if (innerVal.isInstanceOf[v1.InnerVal]) { val obj = innerVal.asInstanceOf[v1.InnerVal] obj.valueType match { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala index ae9e514..f8bf7af 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala @@ -185,7 +185,7 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { } ], "consistencyLevel": "strong", - "schemaVersion": "v2", + "schemaVersion": "v4", "compressionAlgorithm": "gz", "hTableName": "$testHTableName" }""" http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala index aae108e..f4da49d 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala @@ -198,13 +198,13 @@ class StrongLabelDeleteTest extends IntegrateCommon { object StrongDeleteUtil { val labelName = testLabelName2 +// val labelName = testLabelName val maxTgtId = 10 val batchSize = 10 val testNum = 100 val numOfBatch = 10 def testInner(startTs: Long, src: Long) = { - val labelName = testLabelName2 val lastOps = Array.fill(maxTgtId)("none") var currentTs = startTs @@ -245,18 +245,18 @@ class StrongLabelDeleteTest extends IntegrateCommon { } def bulkEdges(startTs: Int = 0) = Seq( - toEdge(startTs + 1, "insert", "e", "0", "1", testLabelName2, s"""{"time": 10}"""), - toEdge(startTs + 2, "insert", "e", "0", "1", testLabelName2, s"""{"time": 11}"""), - toEdge(startTs + 3, "insert", "e", "0", "1", testLabelName2, s"""{"time": 12}"""), - toEdge(startTs + 4, "insert", "e", "0", "2", testLabelName2, s"""{"time": 10}"""), - toEdge(startTs + 5, "insert", "e", "10", "20", testLabelName2, s"""{"time": 10}"""), - toEdge(startTs + 6, "insert", "e", "10", "21", testLabelName2, s"""{"time": 11}"""), - toEdge(startTs + 7, "insert", "e", "11", "20", testLabelName2, s"""{"time": 12}"""), - toEdge(startTs + 8, "insert", "e", "12", "20", testLabelName2, s"""{"time": 13}""") + toEdge(startTs + 1, "insert", "e", "0", "1", labelName, s"""{"time": 10}"""), + toEdge(startTs + 2, "insert", "e", "0", "1", labelName, s"""{"time": 11}"""), + toEdge(startTs + 3, "insert", "e", "0", "1", labelName, s"""{"time": 12}"""), + toEdge(startTs + 4, "insert", "e", "0", "2", labelName, s"""{"time": 10}"""), + toEdge(startTs + 5, "insert", "e", "10", "20", labelName, s"""{"time": 10}"""), + toEdge(startTs + 6, "insert", "e", "10", "21", labelName, s"""{"time": 11}"""), + toEdge(startTs + 7, "insert", "e", "11", "20", labelName, s"""{"time": 12}"""), + toEdge(startTs + 8, "insert", "e", "12", "20", labelName, s"""{"time": 13}""") ) def query(id: Long, serviceName: String = testServiceName, columnName: String = testColumnName, - labelName: String = testLabelName2, direction: String = "out") = Json.parse( + _labelName: String = labelName, direction: String = "out") = Json.parse( s""" { "srcVertices": [ { "serviceName": "$serviceName", @@ -265,7 +265,7 @@ class StrongLabelDeleteTest extends IntegrateCommon { }], "steps": [ [ { - "label": "$labelName", + "label": "${_labelName}", "direction": "${direction}", "offset": 0, "limit": -1, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala index b80d9c7..2028c44 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala @@ -39,12 +39,19 @@ class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach { (result \ "results").as[List[JsValue]].size should be(0) /** insert should be ignored */ - val edgesToStore2 = parser.toEdges(Json.toJson(edges), "insert") - val rets2 = graph.mutateEdges(edgesToStore2, withWait = true) - Await.result(rets2, Duration(20, TimeUnit.MINUTES)) - - result = getEdgesSync(query(0)) - (result \ "results").as[List[JsValue]].size should be(0) + /** + * I am wondering if this is right test case + * This makes sense because hbase think cell is deleted when there are + * insert/delete with same timestamp(version) on same cell. + * This can be different on different storage system so I think + * this test should be removed. + */ +// val edgesToStore2 = parser.toEdges(Json.toJson(edges), "insert") +// val rets2 = graph.mutateEdges(edgesToStore2, withWait = true) +// Await.result(rets2, Duration(20, TimeUnit.MINUTES)) +// +// result = getEdgesSync(query(0)) +// (result \ "results").as[List[JsValue]].size should be(0) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala b/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala index 4fa7b59..f7c4bc1 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala @@ -42,20 +42,36 @@ trait TestCommonWithModels { val serviceName = "_test_service" val serviceNameV2 = "_test_service_v2" + val serviceNameV3 = "_test_service_v3" + val serviceNameV4 = "_test_service_v4" + val columnName = "user_id" val columnNameV2 = "user_id_v2" + val columnNameV3 = "user_id_v3" + val columnNameV4 = "user_id_v4" + val columnType = "long" val columnTypeV2 = "long" + val columnTypeV3 = "long" + val columnTypeV4 = "long" val tgtColumnName = "itme_id" val tgtColumnNameV2 = "item_id_v2" + val tgtColumnNameV3 = "item_id_v3" + val tgtColumnNameV4 = "item_id_v4" + val tgtColumnType = "string" val tgtColumnTypeV2 = "string" + val tgtColumnTypeV3 = "string" + val tgtColumnTypeV4 = "string" val hTableName = "_test_cases" val preSplitSize = 0 + val labelName = "_test_label" val labelNameV2 = "_test_label_v2" + val labelNameV3 = "_test_label_v3" + val labelNameV4 = "_test_label_v4" val undirectedLabelName = "_test_label_undirected" val undirectedLabelNameV2 = "_test_label_undirected_v2" @@ -79,12 +95,16 @@ trait TestCommonWithModels { implicit val session = AutoSession management.createService(serviceName, cluster, hTableName, preSplitSize, hTableTTL = None, "gz") management.createService(serviceNameV2, cluster, hTableName, preSplitSize, hTableTTL = None, "gz") + management.createService(serviceNameV3, cluster, hTableName, preSplitSize, hTableTTL = None, "gz") + management.createService(serviceNameV4, cluster, hTableName, preSplitSize, hTableTTL = None, "gz") } def deleteTestService() = { implicit val session = AutoSession Management.deleteService(serviceName) Management.deleteService(serviceNameV2) + Management.deleteService(serviceNameV3) + Management.deleteService(serviceNameV4) } def deleteTestLabel() = { @@ -103,8 +123,14 @@ trait TestCommonWithModels { management.createLabel(labelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2, isDirected = true, serviceNameV2, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4") + management.createLabel(labelNameV3, serviceNameV3, columnNameV3, columnTypeV3, serviceNameV3, tgtColumnNameV3, tgtColumnTypeV3, + isDirected = true, serviceNameV3, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4") + + management.createLabel(labelNameV4, serviceNameV4, columnNameV4, columnTypeV4, serviceNameV4, tgtColumnNameV4, tgtColumnTypeV4, + isDirected = true, serviceNameV4, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION4, false, "lg4") + management.createLabel(undirectedLabelName, serviceName, columnName, columnType, serviceName, tgtColumnName, tgtColumnType, - isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4") + isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4") management.createLabel(undirectedLabelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2, isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4") @@ -114,18 +140,34 @@ trait TestCommonWithModels { def serviceV2 = Service.findByName(serviceNameV2, useCache = false).get + def serviceV3 = Service.findByName(serviceNameV3, useCache = false).get + + def serviceV4 = Service.findByName(serviceNameV4, useCache = false).get + def column = ServiceColumn.find(service.id.get, columnName, useCache = false).get def columnV2 = ServiceColumn.find(serviceV2.id.get, columnNameV2, useCache = false).get + def columnV3 = ServiceColumn.find(serviceV3.id.get, columnNameV3, useCache = false).get + + def columnV4 = ServiceColumn.find(serviceV4.id.get, columnNameV4, useCache = false).get + def tgtColumn = ServiceColumn.find(service.id.get, tgtColumnName, useCache = false).get def tgtColumnV2 = ServiceColumn.find(serviceV2.id.get, tgtColumnNameV2, useCache = false).get + def tgtColumnV3 = ServiceColumn.find(serviceV3.id.get, tgtColumnNameV3, useCache = false).get + + def tgtColumnV4 = ServiceColumn.find(serviceV4.id.get, tgtColumnNameV4, useCache = false).get + def label = Label.findByName(labelName, useCache = false).get def labelV2 = Label.findByName(labelNameV2, useCache = false).get + def labelV3 = Label.findByName(labelNameV3, useCache = false).get + + def labelV4 = Label.findByName(labelNameV4, useCache = false).get + def undirectedLabel = Label.findByName(undirectedLabelName, useCache = false).get def undirectedLabelV2 = Label.findByName(undirectedLabelNameV2, useCache = false).get @@ -140,7 +182,16 @@ trait TestCommonWithModels { def labelWithDirV2 = LabelWithDirection(labelV2.id.get, dir) + def labelWithDirV3 = LabelWithDirection(labelV3.id.get, dir) + + def labelWithDirV4 = LabelWithDirection(labelV4.id.get, dir) + def queryParam = QueryParam(labelWithDir) def queryParamV2 = QueryParam(labelWithDirV2) + + def queryParamV3 = QueryParam(labelWithDirV3) + + def queryParamV4 = QueryParam(labelWithDirV4) + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e71264d0/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeTest.scala new file mode 100644 index 0000000..e68fc20 --- /dev/null +++ b/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeTest.scala @@ -0,0 +1,81 @@ +package com.kakao.s2graph.core.storage.hbase + +import com.kakao.s2graph.core.mysqls.{Label, LabelMeta, LabelIndex} +import com.kakao.s2graph.core.{IndexEdge, Vertex, TestCommonWithModels} +import com.kakao.s2graph.core.types._ +import org.scalatest.{FunSuite, Matchers} + + +class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { + initTests() + + /** + * check if storage serializer/deserializer can translate from/to bytes array. + * @param l: label for edge. + * @param ts: timestamp for edge. + * @param to: to VertexId for edge. + * @param props: expected props of edge. + */ + def check(l: Label, ts: Long, to: InnerValLike, props: Map[Byte, InnerValLike]): Unit = { + val from = InnerVal.withLong(1, l.schemaVersion) + val vertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, from) + val tgtVertexId = TargetVertexId(HBaseType.DEFAULT_COL_ID, to) + val vertex = Vertex(vertexId, ts) + val tgtVertex = Vertex(tgtVertexId, ts) + val labelWithDir = LabelWithDirection(l.id.get, 0) + + val indexEdge = IndexEdge(vertex, tgtVertex, labelWithDir, 0, ts, LabelIndex.DefaultSeq, props) + val _indexEdgeOpt = graph.storage.indexEdgeDeserializer(l.schemaVersion).fromKeyValues(queryParam, + graph.storage.indexEdgeSerializer(indexEdge).toKeyValues, l.schemaVersion, None) + + _indexEdgeOpt should not be empty + indexEdge should be(_indexEdgeOpt.get) + } + + + /** note that props have to be properly set up for equals */ + test("test serializer/deserializer for index edge.") { + val ts = System.currentTimeMillis() + for { + l <- Seq(label, labelV2, labelV3, labelV4) + } { + val to = InnerVal.withLong(101, l.schemaVersion) + val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion) + val props = Map(LabelMeta.timeStampSeq -> tsInnerVal, + 1.toByte -> InnerVal.withDouble(2.1, l.schemaVersion)) + + check(l, ts, to, props) + } + } + + test("test serializer/deserializer for degree edge.") { + val ts = System.currentTimeMillis() + for { + l <- Seq(label, labelV2, labelV3, labelV4) + } { + val to = InnerVal.withStr("0", l.schemaVersion) + val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion) + val props = Map( + LabelMeta.degreeSeq -> InnerVal.withLong(10, l.schemaVersion), + LabelMeta.timeStampSeq -> tsInnerVal) + + check(l, ts, to, props) + } + } + + test("test serializer/deserializer for incrementCount index edge.") { + val ts = System.currentTimeMillis() + for { + l <- Seq(label, labelV2, labelV3, labelV4) + } { + val to = InnerVal.withLong(101, l.schemaVersion) + + val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion) + val props = Map(LabelMeta.timeStampSeq -> tsInnerVal, + 1.toByte -> InnerVal.withDouble(2.1, l.schemaVersion), + LabelMeta.countSeq -> InnerVal.withLong(10, l.schemaVersion)) + + check(l, ts, to, props) + } + } +} \ No newline at end of file
