Repository: incubator-s2graph
Updated Branches:
refs/heads/master 625e2217c -> 7951f5520
[S2GRAPH-146]: Merge two IndexEdgeDeserializable under tall, wide package into
one.
RA:
[S2GRAPH-146] https://issues.apache.org/jira/browse/S2GRAPH-146
Pull Request:
Closes #111
Authors
DO YUNG YOON: [email protected]
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/7951f552
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/7951f552
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/7951f552
Branch: refs/heads/master
Commit: 7951f5520be7470135393867d85bd22fb2346419
Parents: 625e221
Author: DO YUNG YOON <[email protected]>
Authored: Sun Dec 11 21:49:53 2016 +0900
Committer: daewon <[email protected]>
Committed: Tue Dec 13 10:44:31 2016 +0900
----------------------------------------------------------------------
.../org/apache/s2graph/core/JSONParser.scala | 1 +
.../scala/org/apache/s2graph/core/S2Edge.scala | 13 +-
.../scala/org/apache/s2graph/core/S2Graph.scala | 61 ++++--
.../org/apache/s2graph/core/mysqls/Label.scala | 6 +-
.../s2graph/core/mysqls/ServiceColumn.scala | 11 +-
.../s2graph/core/storage/Deserializable.scala | 26 +--
.../apache/s2graph/core/storage/Storage.scala | 45 ++--
.../core/storage/StorageDeserializable.scala | 21 +-
.../core/storage/hbase/AsynchbaseStorage.scala | 38 +++-
.../tall/IndexEdgeDeserializable.scala | 215 ++++++++++---------
.../wide/IndexEdgeDeserializable.scala | 201 +++++++++--------
.../tall/SnapshotEdgeDeserializable.scala | 118 +++++-----
.../wide/SnapshotEdgeDeserializable.scala | 105 ++++-----
.../serde/vertex/VertexDeserializable.scala | 64 +++---
.../apache/s2graph/core/types/HBaseType.scala | 3 +-
.../s2graph/core/types/InnerValLike.scala | 84 ++++----
.../core/Integrate/IntegrateCommon.scala | 2 +-
.../apache/s2graph/core/JsonParserTest.scala | 2 +-
.../org/apache/s2graph/core/TestCommon.scala | 34 +--
.../s2graph/core/TestCommonWithModels.scala | 2 +-
.../s2graph/core/parsers/WhereParserTest.scala | 4 +-
.../core/storage/hbase/IndexEdgeTest.scala | 3 +-
22 files changed, 577 insertions(+), 482 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
index 732840e..2effaf1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
@@ -182,6 +182,7 @@ object JSONParser {
any match {
case n: BigDecimal =>
if (isNumeric) InnerVal.withNumber(n, version)
+ else if (dType == InnerVal.STRING) InnerVal.withStr(n.toString,
version)
else throw new IllegalDataTypeException(s"[ValueType] = BigDecimal,
[DataType]: $dataType, [Input]: $any")
case l: Long =>
if (isNumeric) InnerVal.withLong(l, version)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 2960265..9cfde75 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -469,17 +469,11 @@ case class S2Edge(innerGraph: S2Graph,
}
override def hashCode(): Int = {
- MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," +
tgtVertex.innerId)
+ id().hashCode()
}
override def equals(other: Any): Boolean = other match {
- case e: S2Edge =>
- srcVertex.innerId == e.srcVertex.innerId &&
- tgtVertex.innerId == e.tgtVertex.innerId &&
- labelWithDir == e.labelWithDir && S2Edge.sameProps(propsWithTs,
e.propsWithTs) &&
- op == e.op && version == e.version &&
- pendingEdgeOpt == e.pendingEdgeOpt && lockTs == lockTs && statusCode
== statusCode &&
- parentEdges == e.parentEdges && originalEdgeOpt == originalEdgeOpt
+ case e: Edge => e.id().equals(e.id())
case _ => false
}
@@ -568,11 +562,12 @@ case class S2Edge(innerGraph: S2Graph,
override def graph(): Graph = innerGraph
- override def id(): AnyRef = (srcVertex.innerId, labelWithDir,
tgtVertex.innerId)
+ override def id(): AnyRef = EdgeId(srcVertex.innerId, tgtVertex.innerId,
label(), direction)
override def label(): String = innerLabel.label
}
+case class EdgeId(srcVertexId: InnerValLike, tgtVertexId: InnerValLike,
labelName: String, direction: String)
case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge],
edgesToInsert: List[IndexEdge] = List.empty[IndexEdge],
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index b3f3ac8..e8bda2c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -1120,8 +1120,8 @@ class S2Graph(_config: Config)(implicit val ec:
ExecutionContext) extends Graph
operation: String = "insert"): S2Edge = {
val label = Label.findByName(labelName).getOrElse(throw new
LabelNotExistException(labelName))
- val srcVertexIdInnerVal = toInnerVal(srcId.toString,
label.srcColumn.columnType, label.schemaVersion)
- val tgtVertexIdInnerVal = toInnerVal(tgtId.toString,
label.tgtColumn.columnType, label.schemaVersion)
+ val srcVertexIdInnerVal = toInnerVal(srcId, label.srcColumn.columnType,
label.schemaVersion)
+ val tgtVertexIdInnerVal = toInnerVal(tgtId, label.tgtColumn.columnType,
label.schemaVersion)
val srcVertex = newVertex(SourceVertexId(label.srcColumn,
srcVertexIdInnerVal), System.currentTimeMillis())
val tgtVertex = newVertex(TargetVertexId(label.tgtColumn,
tgtVertexIdInnerVal), System.currentTimeMillis())
@@ -1145,7 +1145,7 @@ class S2Graph(_config: Config)(implicit val ec:
ExecutionContext) extends Graph
val column = ServiceColumn.find(service.id.get,
columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
val op = GraphUtil.toOp(operation).getOrElse(throw new
RuntimeException(s"$operation is not supported."))
- val srcVertexId = VertexId(column, toInnerVal(id.toString,
column.columnType, column.schemaVersion))
+ val srcVertexId = VertexId(column, toInnerVal(id, column.columnType,
column.schemaVersion))
val propsInner = column.propsToInnerVals(props) ++
Map(ColumnMeta.timestamp -> InnerVal.withLong(ts, column.schemaVersion))
@@ -1346,28 +1346,55 @@ class S2Graph(_config: Config)(implicit val ec:
ExecutionContext) extends Graph
override def vertices(vertexIds: AnyRef*): util.Iterator[structure.Vertex] =
{
val fetchVertices = vertexIds.lastOption.map { lastParam =>
if (lastParam.isInstanceOf[Boolean]) lastParam.asInstanceOf[Boolean]
- else false
- }.getOrElse(false)
+ else true
+ }.getOrElse(true)
- val vertices = for {
- vertexId <- vertexIds if vertexId.isInstanceOf[VertexId]
- } yield newVertex(vertexId.asInstanceOf[VertexId])
-
- if (fetchVertices) {
- val future = getVertices(vertices).map { vs =>
- val ls = new util.ArrayList[structure.Vertex]()
- ls.addAll(vs)
- ls.iterator()
+ if (vertexIds.isEmpty) {
+ //TODO: default storage need to be fixed.
+ Await.result(defaultStorage.fetchVerticesAll(), WaitTimeout).iterator
+ } else {
+ val vertices = for {
+ vertexId <- vertexIds if vertexId.isInstanceOf[VertexId]
+ } yield newVertex(vertexId.asInstanceOf[VertexId])
+
+ if (fetchVertices) {
+ val future = getVertices(vertices).map { vs =>
+ val ls = new util.ArrayList[structure.Vertex]()
+ ls.addAll(vs)
+ ls.iterator()
+ }
+ Await.result(future, WaitTimeout)
+ } else {
+ vertices.iterator
}
- Await.result(future, WaitTimeout)
+ }
+ }
+
+ override def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = {
+ if (edgeIds.isEmpty) {
+ Await.result(defaultStorage.fetchEdgesAll(), WaitTimeout).iterator
} else {
- vertices.iterator
+ Await.result(edgesAsync(edgeIds: _*), WaitTimeout)
}
}
+ def edgesAsync(edgeIds: AnyRef*): Future[util.Iterator[structure.Edge]] = {
+ val s2EdgeIds =
edgeIds.filter(_.isInstanceOf[EdgeId]).map(_.asInstanceOf[EdgeId])
+ val edgesToFetch = for {
+ id <- s2EdgeIds
+ } yield {
+ toEdge(id.srcVertexId, id.tgtVertexId, id.labelName, id.direction)
+ }
+
+ checkEdges(edgesToFetch).map { stepResult =>
+ val ls = new util.ArrayList[structure.Edge]
+ stepResult.edgeWithScores.foreach { es => ls.add(es.edge) }
+ ls.iterator()
+ }
+ }
override def tx(): Transaction = ???
- override def edges(objects: AnyRef*): util.Iterator[structure.Edge] = ???
+
override def variables(): Variables = ???
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index 4970912..415a64e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -190,8 +190,8 @@ object Label extends Model[Label] {
val serviceId = service.id.get
/** insert serviceColumn */
- val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName,
Some(srcColumnType), schemaVersion)
- val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName,
Some(tgtColumnType), schemaVersion)
+ val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName,
Some(srcColumnType))
+ val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName,
Some(tgtColumnType))
if (srcCol.columnType != srcColumnType) throw new
RuntimeException(s"source service column type not matched ${srcCol.columnType}
!= ${srcColumnType}")
if (tgtCol.columnType != tgtColumnType) throw new
RuntimeException(s"target service column type not matched ${tgtCol.columnType}
!= ${tgtColumnType}")
@@ -480,7 +480,7 @@ case class Label(id: Option[Int], label: String,
for {
(k, v) <- props
labelMeta <- metaPropsInvMap.get(k)
- innerVal = toInnerVal(v.toString, labelMeta.dataType, schemaVersion)
+ innerVal = toInnerVal(v, labelMeta.dataType, schemaVersion)
} yield labelMeta -> InnerValLikeWithTs(innerVal, ts)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
----------------------------------------------------------------------
diff --git
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
index ebbbf88..f791f22 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
@@ -76,7 +76,7 @@ object ServiceColumn extends Model[ServiceColumn] {
expireCaches(key)
}
}
- def findOrInsert(serviceId: Int, columnName: String, columnType:
Option[String], schemaVersion: String)(implicit session: DBSession =
AutoSession): ServiceColumn = {
+ def findOrInsert(serviceId: Int, columnName: String, columnType:
Option[String], schemaVersion: String = HBaseType.DEFAULT_VERSION)(implicit
session: DBSession = AutoSession): ServiceColumn = {
find(serviceId, columnName) match {
case Some(sc) => sc
case None =>
@@ -97,9 +97,14 @@ object ServiceColumn extends Model[ServiceColumn] {
var cacheKey = s"serviceId=${x.serviceId}:columnName=${x.columnName}"
(cacheKey -> x)
})
+ ls
}
}
-case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String,
columnType: String, schemaVersion: String) {
+case class ServiceColumn(id: Option[Int],
+ serviceId: Int,
+ columnName: String,
+ columnType: String,
+ schemaVersion: String) {
lazy val service = Service.findById(serviceId)
lazy val metas = ColumnMeta.timestamp +: ColumnMeta.findAllByColumn(id.get)
:+ ColumnMeta.lastModifiedAtColumn
@@ -112,7 +117,7 @@ case class ServiceColumn(id: Option[Int], serviceId: Int,
columnName: String, co
for {
(k, v) <- props
labelMeta <- metasInvMap.get(k)
- innerVal = toInnerVal(v.toString, labelMeta.dataType, schemaVersion)
+ innerVal = toInnerVal(v, labelMeta.dataType, schemaVersion)
} yield labelMeta -> innerVal
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala
----------------------------------------------------------------------
diff --git
a/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala
b/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala
index d82c507..af20483 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core.storage
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.types.{LabelWithDirection, SourceVertexId,
VertexId}
+import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection,
SourceVertexId, VertexId}
trait Deserializable[E] extends StorageDeserializable[E] {
@@ -28,16 +28,16 @@ trait Deserializable[E] extends StorageDeserializable[E] {
type RowKeyRaw = (VertexId, LabelWithDirection, Byte, Boolean, Int)
- /** version 1 and version 2 share same code for parsing row key part */
- def parseRow(kv: SKeyValue, version: String): RowKeyRaw = {
- var pos = 0
- val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos,
kv.row.length, version)
- pos += srcIdLen
- val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
- pos += 4
- val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row,
pos)
-
- val rowLen = srcIdLen + 4 + 1
- (srcVertexId, labelWithDir, labelIdxSeq, isInverted, rowLen)
- }
+// /** version 1 and version 2 share same code for parsing row key part */
+// def parseRow(kv: SKeyValue, version: String = HBaseType.DEFAULT_VERSION):
RowKeyRaw = {
+// var pos = 0
+// val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos,
kv.row.length, version)
+// pos += srcIdLen
+// val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+// pos += 4
+// val (labelIdxSeq, isInverted) =
bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
+//
+// val rowLen = srcIdLen + 4 + 1
+// (srcVertexId, labelWithDir, labelIdxSeq, isInverted, rowLen)
+// }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git
a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 59b7518..e067983 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -75,7 +75,8 @@ abstract class Storage[Q, R](val graph: S2Graph,
*/
def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge):
Serializable[SnapshotEdge] = {
snapshotEdge.schemaVer match {
- case VERSION1 | VERSION2 => new
serde.snapshotedge.wide.SnapshotEdgeSerializable(snapshotEdge)
+// case VERSION1 |
+ case VERSION2 => new
serde.snapshotedge.wide.SnapshotEdgeSerializable(snapshotEdge)
case VERSION3 | VERSION4 => new
serde.snapshotedge.tall.SnapshotEdgeSerializable(snapshotEdge)
case _ => throw new RuntimeException(s"not supported version:
${snapshotEdge.schemaVer}")
}
@@ -88,7 +89,8 @@ abstract class Storage[Q, R](val graph: S2Graph,
*/
def indexEdgeSerializer(indexEdge: IndexEdge): Serializable[IndexEdge] = {
indexEdge.schemaVer match {
- case VERSION1 | VERSION2 | VERSION3 => new
IndexEdgeSerializable(indexEdge)
+// case VERSION1
+ case VERSION2 | VERSION3 => new IndexEdgeSerializable(indexEdge)
case VERSION4 => new
serde.indexedge.tall.IndexEdgeSerializable(indexEdge)
case _ => throw new RuntimeException(s"not supported version:
${indexEdge.schemaVer}")
@@ -112,25 +114,14 @@ abstract class Storage[Q, R](val graph: S2Graph,
* then that storage implementation is responsible to provide implicit type
conversion method on CanSKeyValue.
* */
- val snapshotEdgeDeserializers: Map[String, Deserializable[SnapshotEdge]] =
Map(
- VERSION1 -> new SnapshotEdgeDeserializable(graph),
- VERSION2 -> new SnapshotEdgeDeserializable(graph),
- VERSION3 -> new serde.snapshotedge.tall.SnapshotEdgeDeserializable(graph),
- VERSION4 -> new serde.snapshotedge.tall.SnapshotEdgeDeserializable(graph)
- )
- def snapshotEdgeDeserializer(schemaVer: String) =
- snapshotEdgeDeserializers.get(schemaVer).getOrElse(throw new
RuntimeException(s"not supported version: ${schemaVer}"))
+ val snapshotEdgeDeserializer: Deserializable[SnapshotEdge] = new
serde.snapshotedge.tall.SnapshotEdgeDeserializable(graph)
+
+ def snapshotEdgeDeserializer(schemaVer: String):
Deserializable[SnapshotEdge] = snapshotEdgeDeserializer
/** create deserializer that can parse stored CanSKeyValue into indexEdge. */
- val indexEdgeDeserializers: Map[String, Deserializable[S2Edge]] = Map(
- VERSION1 -> new IndexEdgeDeserializable(graph),
- VERSION2 -> new IndexEdgeDeserializable(graph),
- VERSION3 -> new IndexEdgeDeserializable(graph),
- VERSION4 -> new serde.indexedge.tall.IndexEdgeDeserializable(graph)
- )
+ val indexEdgeDeserializer: Deserializable[S2Edge] = new
serde.indexedge.tall.IndexEdgeDeserializable(graph)
- def indexEdgeDeserializer(schemaVer: String) =
- indexEdgeDeserializers.get(schemaVer).getOrElse(throw new
RuntimeException(s"not supported version: ${schemaVer}"))
+ def indexEdgeDeserializer(schemaVer: String) = new
serde.indexedge.tall.IndexEdgeDeserializable(graph)
/** create deserializer that can parser stored CanSKeyValue into vertex. */
val vertexDeserializer: Deserializable[S2Vertex] = new
VertexDeserializable(graph)
@@ -251,6 +242,10 @@ abstract class Storage[Q, R](val graph: S2Graph,
def flush(): Unit = {
}
+ def fetchEdgesAll(): Future[Seq[S2Edge]]
+
+ def fetchVerticesAll(): Future[Seq[S2Vertex]]
+
/**
* create table on storage.
* if storage implementation does not support namespace or table, then there
is nothing to be done
@@ -280,7 +275,7 @@ abstract class Storage[Q, R](val graph: S2Graph,
def fromResult(kvs: Seq[SKeyValue],
version: String): Option[S2Vertex] = {
if (kvs.isEmpty) None
- else vertexDeserializer.fromKeyValues(None, kvs, version, None)
+ else vertexDeserializer.fromKeyValues(kvs, None)
// .map(S2Vertex(graph, _))
}
@@ -837,7 +832,7 @@ abstract class Storage[Q, R](val graph: S2Graph,
val queryOption = queryRequest.query.queryOption
val queryParam = queryRequest.queryParam
val schemaVer = queryParam.label.schemaVersion
- val indexEdgeOpt =
indexEdgeDeserializer(schemaVer).fromKeyValues(Option(queryParam.label),
Seq(kv), queryParam.label.schemaVersion, cacheElementOpt)
+ val indexEdgeOpt =
indexEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), cacheElementOpt)
if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge =>
indexEdge.copy(parentEdges = parentEdges))
else indexEdgeOpt
} catch {
@@ -855,7 +850,7 @@ abstract class Storage[Q, R](val graph: S2Graph,
// logger.debug(s"SnapshottoEdge: $kv")
val queryParam = queryRequest.queryParam
val schemaVer = queryParam.label.schemaVersion
- val snapshotEdgeOpt =
snapshotEdgeDeserializer(schemaVer).fromKeyValues(Option(queryParam.label),
Seq(kv), queryParam.label.schemaVersion, cacheElementOpt)
+ val snapshotEdgeOpt =
snapshotEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), cacheElementOpt)
if (isInnerCall) {
snapshotEdgeOpt.flatMap { snapshotEdge =>
@@ -901,7 +896,7 @@ abstract class Storage[Q, R](val graph: S2Graph,
val schemaVer = queryParam.label.schemaVersion
val cacheElementOpt =
if (queryParam.isSnapshotEdge) None
- else
indexEdgeDeserializer(schemaVer).fromKeyValues(Option(queryParam.label),
Seq(kv), queryParam.label.schemaVersion, None)
+ else indexEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), None)
val (degreeEdges, keyValues) = cacheElementOpt match {
case None => (Nil, kvs)
@@ -971,14 +966,14 @@ abstract class Storage[Q, R](val graph: S2Graph,
tgtVertexIdOpt match {
case Some(tgtVertexId) => // _to is given.
/** we use toSnapshotEdge so dont need to swap src, tgt */
- val src = InnerVal.convertVersion(srcVertex.innerId,
srcColumn.columnType, label.schemaVersion)
- val tgt = InnerVal.convertVersion(tgtVertexId, tgtColumn.columnType,
label.schemaVersion)
+ val src = srcVertex.innerId
+ val tgt = tgtVertexId
val (srcVId, tgtVId) = (SourceVertexId(srcColumn, src),
TargetVertexId(tgtColumn, tgt))
val (srcV, tgtV) = (graph.newVertex(srcVId), graph.newVertex(tgtVId))
graph.newEdge(srcV, tgtV, label, labelWithDir.dir, propsWithTs =
propsWithTs)
case None =>
- val src = InnerVal.convertVersion(srcVertex.innerId,
srcColumn.columnType, label.schemaVersion)
+ val src = srcVertex.innerId
val srcVId = SourceVertexId(srcColumn, src)
val srcV = graph.newVertex(srcVId)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
----------------------------------------------------------------------
diff --git
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
index d2a7de7..811cf62 100644
---
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
+++
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
@@ -106,14 +106,15 @@ object StorageDeserializable {
}
trait StorageDeserializable[E] {
- def fromKeyValues[T: CanSKeyValue](checkLabel: Option[Label], kvs: Seq[T],
version: String, cacheElementOpt: Option[E]): Option[E] = {
- try {
- Option(fromKeyValuesInner(checkLabel, kvs, version, cacheElementOpt))
- } catch {
- case e: Exception =>
- logger.error(s"${this.getClass.getName} fromKeyValues failed.", e)
- None
- }
- }
- def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label], kvs:
Seq[T], version: String, cacheElementOpt: Option[E]): E
+ def fromKeyValues[T: CanSKeyValue](kvs: Seq[T], cacheElementOpt: Option[E]):
Option[E]
+// = {
+// try {
+// Option(fromKeyValuesInner(kvs, cacheElementOpt))
+// } catch {
+// case e: Exception =>
+// logger.error(s"${this.getClass.getName} fromKeyValues failed.", e)
+// None
+// }
+// }
+// def fromKeyValuesInner[T: CanSKeyValue](kvs: Seq[T], cacheElementOpt:
Option[E]): E
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 93b2454..676ba41 100644
---
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor,
HTableDescriptor, TableName}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
import org.apache.s2graph.core.storage._
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC,
ScanWithRange}
import org.apache.s2graph.core.types.{HBaseType, VertexId}
@@ -110,6 +110,7 @@ class AsynchbaseStorage(override val graph: S2Graph,
lazy val clients = Seq(client, clientWithFlush)
private val emptyKeyValues = new util.ArrayList[KeyValue]()
+ private val emptyKeyValuesLs = new util.ArrayList[util.ArrayList[KeyValue]]()
private val emptyStepResult = new util.ArrayList[StepResult]()
private def client(withWait: Boolean): HBaseClient = if (withWait)
clientWithFlush else client
@@ -522,7 +523,7 @@ class AsynchbaseStorage(override val graph: S2Graph,
version: String): Option[S2Vertex] = {
if (kvs.isEmpty) None
- else vertexDeserializer.fromKeyValues(None, kvs, version, None)
+ else vertexDeserializer.fromKeyValues(kvs, None)
// .map(S2Vertex(graph, _))
}
@@ -542,6 +543,39 @@ class AsynchbaseStorage(override val graph: S2Graph,
Future.sequence(futures).map { result => result.toList.flatten }
}
+ override def fetchEdgesAll(): Future[Seq[S2Edge]] = {
+ val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case
(hTableName, labels) =>
+ val scan = AsynchbasePatcher.newScanner(client, hTableName)
+ scan.setFamily(Serializable.edgeCf)
+ scan.setMaxVersions(1)
+
+ scan.nextRows(10000).toFuture(emptyKeyValuesLs).map { kvsLs =>
+ kvsLs.flatMap { kvs =>
+ kvs.flatMap { kv =>
+ indexEdgeDeserializer.fromKeyValues(Seq(kv), None)
+ }
+ }
+ }
+ }
+
+ Future.sequence(futures).map(_.flatten)
+ }
+
+ override def fetchVerticesAll(): Future[Seq[S2Vertex]] = {
+ val futures =
ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case
(hTableName, columns) =>
+ val scan = AsynchbasePatcher.newScanner(client, hTableName)
+ scan.setFamily(Serializable.vertexCf)
+ scan.setMaxVersions(1)
+
+ scan.nextRows(10000).toFuture(emptyKeyValuesLs).map { kvsLs =>
+ kvsLs.flatMap { kvs =>
+ vertexDeserializer.fromKeyValues(kvs, None)
+ }
+ }
+ }
+ Future.sequence(futures).map(_.flatten)
+ }
+
class V4ResultHandler(scanner: Scanner, defer:
Deferred[util.ArrayList[KeyValue]], offset: Int, limit : Int) extends
Callback[Object, util.ArrayList[util.ArrayList[KeyValue]]] {
val results = new util.ArrayList[KeyValue]()
var offsetCount = 0
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index 5549f4e..8f47f97 100644
---
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -20,13 +20,11 @@
package org.apache.s2graph.core.storage.serde.indexedge.tall
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{ServiceColumn, Label, LabelMeta}
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
import org.apache.s2graph.core.storage.StorageDeserializable._
import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable,
StorageDeserializable}
import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core._
-import scala.collection.immutable
object IndexEdgeDeserializable{
def getNewInstance(graph: S2Graph) = new IndexEdgeDeserializable(graph)
@@ -38,101 +36,126 @@ class IndexEdgeDeserializable(graph: S2Graph,
type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte,
Boolean, Int)
type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int)
- override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
- _kvs: Seq[T],
- schemaVer: String,
- cacheElementOpt:
Option[S2Edge]): S2Edge = {
-
- assert(_kvs.size == 1)
-
- // val kvs = _kvs.map { kv =>
implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
- val kv = implicitly[CanSKeyValue[T]].toSKeyValue(_kvs.head)
-// logger.debug(s"[DES]: ${kv.toLogString}}")
-
- val version = kv.timestamp
- // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList},
${kv.value.toList}")
- var pos = 0
- val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos,
kv.row.length, schemaVer)
- pos += srcIdLen
- val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
- pos += 4
- val (labelIdxSeq, isInverted) =
bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
- pos += 1
-
- val label = checkLabel.getOrElse(Label.findById(labelWithDir.labelId))
-
- val srcVertex = graph.newVertex(srcVertexId, version)
- //TODO:
- val edge = graph.newEdge(srcVertex, null,
- label, labelWithDir.dir, GraphUtil.defaultOpByte, version,
S2Edge.EmptyState)
- var tsVal = version
-
- if (pos == kv.row.length) {
- // degree
- // val degreeVal = Bytes.toLong(kv.value)
- val degreeVal = bytesToLongFunc(kv.value, 0)
- val tgtVertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("0",
schemaVer))
-
- edge.property(LabelMeta.timestamp.name, version, version)
- edge.property(LabelMeta.degree.name, degreeVal, version)
- edge.tgtVertex = graph.newVertex(tgtVertexId, version)
- edge.op = GraphUtil.defaultOpByte
- edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
- edge
- } else {
- // not degree edge
- val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer)
- pos = endAt
-
-
- val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length - 1) {
- (HBaseType.defaultTgtVertexId, 0)
- } else {
- TargetVertexId.fromBytes(kv.row, endAt, kv.row.length - 1, schemaVer)
- }
- val op = kv.row(kv.row.length-1)
-
- val index = label.indicesMap.getOrElse(labelIdxSeq, throw new
RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
- /** process indexProps */
- val size = idxPropsRaw.length
- (0 until size).foreach { ith =>
- val meta = index.sortKeyTypesArray(ith)
- val (k, v) = idxPropsRaw(ith)
- if (k == LabelMeta.timestamp) tsVal =
v.value.asInstanceOf[BigDecimal].longValue()
-
- if (k == LabelMeta.degree) {
- edge.property(LabelMeta.degree.name, v.value, version)
+ override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
+ cacheElementOpt:
Option[S2Edge]): Option[S2Edge] = {
+
+ try {
+ assert(_kvs.size == 1)
+
+ // val kvs = _kvs.map { kv =>
implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+ val kv = implicitly[CanSKeyValue[T]].toSKeyValue(_kvs.head)
+ // logger.debug(s"[DES]: ${kv.toLogString}}")
+
+ val version = kv.timestamp
+
+ var pos = 0
+ val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos,
kv.row.length, HBaseType.DEFAULT_VERSION)
+ pos += srcIdLen
+ val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+ pos += 4
+ val (labelIdxSeq, isInverted) =
bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
+ pos += 1
+
+ if (isInverted) None
+ else {
+ val label = Label.findById(labelWithDir.labelId)
+ val schemaVer = label.schemaVersion
+ val srcVertex = graph.newVertex(srcVertexId, version)
+ //TODO:
+ val edge = graph.newEdge(srcVertex, null,
+ label, labelWithDir.dir, GraphUtil.defaultOpByte, version,
S2Edge.EmptyState)
+ var tsVal = version
+ val isTallSchema = label.schemaVersion == HBaseType.VERSION4
+ val isDegree = if (isTallSchema) pos == kv.row.length else
kv.qualifier.isEmpty
+
+ if (isDegree) {
+ // degree
+ // val degreeVal = Bytes.toLong(kv.value)
+ val degreeVal = bytesToLongFunc(kv.value, 0)
+ val tgtVertexId = VertexId(ServiceColumn.Default,
InnerVal.withStr("0", schemaVer))
+
+ edge.property(LabelMeta.timestamp.name, version, version)
+ edge.property(LabelMeta.degree.name, degreeVal, version)
+ edge.tgtVertex = graph.newVertex(tgtVertexId, version)
+ edge.op = GraphUtil.defaultOpByte
+ edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
} else {
- edge.property(meta.name, v.value, version)
+ // not degree edge
+ val (idxPropsRaw, endAt) =
+ if (isTallSchema) bytesToProps(kv.row, pos, schemaVer)
+ else {
+ bytesToProps(kv.qualifier, 0, schemaVer)
+ }
+ pos = endAt
+
+ val (tgtVertexIdRaw, tgtVertexIdLen) = if (isTallSchema) {
+ if (endAt == kv.row.length - 1) {
+ (HBaseType.defaultTgtVertexId, 0)
+ } else {
+ TargetVertexId.fromBytes(kv.row, endAt, kv.row.length - 1,
schemaVer)
+ }
+ } else {
+ if (endAt == kv.qualifier.length) {
+ (HBaseType.defaultTgtVertexId, 0)
+ } else {
+ TargetVertexId.fromBytes(kv.qualifier, endAt,
kv.qualifier.length, schemaVer)
+ }
+ }
+ pos += tgtVertexIdLen
+
+ val op =
+ if (isTallSchema) kv.row(kv.row.length - 1)
+ else {
+ if (kv.qualifier.length == pos) GraphUtil.defaultOpByte
+ else kv.qualifier(kv.qualifier.length - 1)
+ }
+
+ val index = label.indicesMap.getOrElse(labelIdxSeq, throw new
RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
+ /** process indexProps */
+ val size = idxPropsRaw.length
+ (0 until size).foreach { ith =>
+ val meta = index.sortKeyTypesArray(ith)
+ val (k, v) = idxPropsRaw(ith)
+ if (k == LabelMeta.timestamp) tsVal =
v.value.asInstanceOf[BigDecimal].longValue()
+
+ if (k == LabelMeta.degree) {
+ edge.property(LabelMeta.degree.name, v.value, version)
+ } else {
+ edge.property(meta.name, v.value, version)
+ }
+ }
+
+ /** process props */
+ if (op == GraphUtil.operations("incrementCount")) {
+ // val countVal = Bytes.toLong(kv.value)
+ val countVal = bytesToLongFunc(kv.value, 0)
+ edge.property(LabelMeta.count.name, countVal, version)
+ } else {
+ val (props, endAt) = bytesToKeyValues(kv.value, 0,
kv.value.length, schemaVer, label)
+ props.foreach { case (k, v) =>
+ if (k == LabelMeta.timestamp) tsVal =
v.value.asInstanceOf[BigDecimal].longValue()
+
+ edge.property(k.name, v.value, version)
+ }
+ }
+
+ /** process tgtVertexId */
+ val tgtVertexId =
+ if (edge.checkProperty(LabelMeta.to.name)) {
+ val vId =
edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
+ TargetVertexId(ServiceColumn.Default, vId.innerVal)
+ } else tgtVertexIdRaw
+
+ edge.property(LabelMeta.timestamp.name, tsVal, version)
+ edge.tgtVertex = graph.newVertex(tgtVertexId, version)
+ edge.op = op
+ edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
}
+ Option(edge)
}
-
- /** process props */
- if (op == GraphUtil.operations("incrementCount")) {
- // val countVal = Bytes.toLong(kv.value)
- val countVal = bytesToLongFunc(kv.value, 0)
- edge.property(LabelMeta.count.name, countVal, version)
- } else {
- val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length,
schemaVer, label)
- props.foreach { case (k, v) =>
- if (k == LabelMeta.timestamp) tsVal =
v.value.asInstanceOf[BigDecimal].longValue()
-
- edge.property(k.name, v.value, version)
- }
- }
-
- /** process tgtVertexId */
- val tgtVertexId =
- if (edge.checkProperty(LabelMeta.to.name)) {
- val vId =
edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
- TargetVertexId(ServiceColumn.Default, vId.innerVal)
- } else tgtVertexIdRaw
-
-
- edge.tgtVertex = graph.newVertex(tgtVertexId, version)
- edge.op = op
- edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
- edge
+ } catch {
+ case e: Exception =>
+ None
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index 31a1a89..09d7f4c 100644
---
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -19,12 +19,12 @@
package org.apache.s2graph.core.storage.serde.indexedge.wide
-import org.apache.s2graph.core.mysqls.{ServiceColumn, Label, LabelMeta}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
import org.apache.s2graph.core.storage.StorageDeserializable._
import org.apache.s2graph.core.storage._
import org.apache.s2graph.core.types._
-import org.apache.s2graph.core._
-import scala.collection.immutable
class IndexEdgeDeserializable(graph: S2Graph,
bytesToLongFunc: (Array[Byte], Int) => Long =
bytesToLong) extends Deserializable[S2Edge] {
@@ -33,105 +33,104 @@ class IndexEdgeDeserializable(graph: S2Graph,
type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte,
Boolean, Int)
type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int)
- private def parseDegreeQualifier(kv: SKeyValue, schemaVer: String):
QualifierRaw = {
- // val degree = Bytes.toLong(kv.value)
- val degree = bytesToLongFunc(kv.value, 0)
- val idxPropsRaw = Array(LabelMeta.degree -> InnerVal.withLong(degree,
schemaVer))
- val tgtVertexIdRaw = VertexId(ServiceColumn.Default,
InnerVal.withStr("0", schemaVer))
- (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
- }
-
- private def parseQualifier(kv: SKeyValue, schemaVer: String): QualifierRaw
= {
- var qualifierLen = 0
- var pos = 0
- val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
- val (props, endAt) = bytesToProps(kv.qualifier, pos, schemaVer)
- pos = endAt
- qualifierLen += endAt
- val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
- (HBaseType.defaultTgtVertexId, 0)
- } else {
- TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length,
schemaVer)
- }
- qualifierLen += tgtVertexIdLen
- (props, endAt, tgtVertexId, tgtVertexIdLen)
- }
- val (op, opLen) =
- if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
- else (kv.qualifier(qualifierLen), 1)
-
- qualifierLen += opLen
-
- (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
- }
-
- override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
- _kvs: Seq[T],
- schemaVer: String,
- cacheElementOpt:
Option[S2Edge]): S2Edge = {
- assert(_kvs.size == 1)
-
-// val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
- val kv = implicitly[CanSKeyValue[T]].toSKeyValue(_kvs.head)
- val version = kv.timestamp
-
-// val (srcVertexId, labelWithDir, labelIdxSeq, _, _) =
cacheElementOpt.map { e =>
-// (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
-// }.getOrElse(parseRow(kv, schemaVer))
- val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = parseRow(kv,
schemaVer)
-
- val label = checkLabel.getOrElse(Label.findById(labelWithDir.labelId))
- val srcVertex = graph.newVertex(srcVertexId, version)
- //TODO:
- val edge = graph.newEdge(srcVertex, null,
- label, labelWithDir.dir, GraphUtil.defaultOpByte, version,
S2Edge.EmptyState)
- var tsVal = version
-
- val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
- if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer)
- else parseQualifier(kv, schemaVer)
-
- val index = label.indicesMap.getOrElse(labelIdxSeq, throw new
RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
-
- /** process indexProps */
- val size = idxPropsRaw.length
- (0 until size).foreach { ith =>
- val meta = index.sortKeyTypesArray(ith)
- val (k, v) = idxPropsRaw(ith)
- if (k == LabelMeta.timestamp) tsVal =
v.value.asInstanceOf[BigDecimal].longValue()
-
- if (k == LabelMeta.degree) {
- edge.property(LabelMeta.degree.name, v.value, version)
- } else {
- edge.property(meta.name, v.value, version)
- }
- }
-
- /** process props */
- if (op == GraphUtil.operations("incrementCount")) {
- // val countVal = Bytes.toLong(kv.value)
- val countVal = bytesToLongFunc(kv.value, 0)
- edge.property(LabelMeta.count.name, countVal, version)
- } else {
- val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length,
schemaVer, label)
- props.foreach { case (k, v) =>
- if (k == LabelMeta.timestamp) tsVal =
v.value.asInstanceOf[BigDecimal].longValue()
-
- edge.property(k.name, v.value, version)
+ override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
+ cacheElementOpt:
Option[S2Edge]): Option[S2Edge] = {
+ try {
+ assert(_kvs.size == 1)
+
+ // val kvs = _kvs.map { kv =>
implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+
+ val kv = implicitly[CanSKeyValue[T]].toSKeyValue(_kvs.head)
+ val version = kv.timestamp
+
+ var pos = 0
+ val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos,
kv.row.length, HBaseType.DEFAULT_VERSION)
+ pos += srcIdLen
+ val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+ pos += 4
+ val (labelIdxSeq, isInverted) =
bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
+ pos += 1
+
+ if (isInverted) None
+ else {
+ val label = Label.findById(labelWithDir.labelId)
+ val schemaVer = label.schemaVersion
+ val srcVertex = graph.newVertex(srcVertexId, version)
+ //TODO:
+ val edge = graph.newEdge(srcVertex, null,
+ label, labelWithDir.dir, GraphUtil.defaultOpByte, version,
S2Edge.EmptyState)
+ var tsVal = version
+
+ if (kv.qualifier.isEmpty) {
+ val degreeVal = bytesToLongFunc(kv.value, 0)
+ val tgtVertexId = VertexId(ServiceColumn.Default,
InnerVal.withStr("0", schemaVer))
+
+ edge.property(LabelMeta.timestamp.name, version, version)
+ edge.property(LabelMeta.degree.name, degreeVal, version)
+ edge.tgtVertex = graph.newVertex(tgtVertexId, version)
+ edge.op = GraphUtil.defaultOpByte
+ edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
+ } else {
+ pos = 0
+ val (idxPropsRaw, endAt) = bytesToProps(kv.qualifier, pos,
schemaVer)
+ pos = endAt
+
+ val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt ==
kv.qualifier.length) {
+ (HBaseType.defaultTgtVertexId, 0)
+ } else {
+ TargetVertexId.fromBytes(kv.qualifier, endAt,
kv.qualifier.length, schemaVer)
+ }
+ pos += tgtVertexIdLen
+ val op =
+ if (kv.qualifier.length == pos) GraphUtil.defaultOpByte
+ else kv.qualifier(kv.qualifier.length-1)
+
+ val index = label.indicesMap.getOrElse(labelIdxSeq, throw new
RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
+
+ /** process indexProps */
+ val size = idxPropsRaw.length
+ (0 until size).foreach { ith =>
+ val meta = index.sortKeyTypesArray(ith)
+ val (k, v) = idxPropsRaw(ith)
+ if (k == LabelMeta.timestamp) tsVal =
v.value.asInstanceOf[BigDecimal].longValue()
+
+ if (k == LabelMeta.degree) {
+ edge.property(LabelMeta.degree.name, v.value, version)
+ } else {
+ edge.property(meta.name, v.value, version)
+ }
+ }
+
+ /** process props */
+ if (op == GraphUtil.operations("incrementCount")) {
+ // val countVal = Bytes.toLong(kv.value)
+ val countVal = bytesToLongFunc(kv.value, 0)
+ edge.property(LabelMeta.count.name, countVal, version)
+ } else {
+ val (props, endAt) = bytesToKeyValues(kv.value, 0,
kv.value.length, schemaVer, label)
+ props.foreach { case (k, v) =>
+ if (k == LabelMeta.timestamp) tsVal =
v.value.asInstanceOf[BigDecimal].longValue()
+
+ edge.property(k.name, v.value, version)
+ }
+ }
+ /** process tgtVertexId */
+ val tgtVertexId =
+ if (edge.checkProperty(LabelMeta.to.name)) {
+ val vId =
edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
+ TargetVertexId(ServiceColumn.Default, vId.innerVal)
+ } else tgtVertexIdRaw
+
+ edge.property(LabelMeta.timestamp.name, tsVal, version)
+ edge.tgtVertex = graph.newVertex(tgtVertexId, version)
+ edge.op = op
+ edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
+ }
+
+ Option(edge)
}
+ } catch {
+ case e: Exception => None
}
- /** process tgtVertexId */
- val tgtVertexId =
- if (edge.checkProperty(LabelMeta.to.name)) {
- val vId =
edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
- TargetVertexId(ServiceColumn.Default, vId.innerVal)
- } else tgtVertexIdRaw
-
- edge.property(LabelMeta.timestamp.name, tsVal, version)
- edge.tgtVertex = graph.newVertex(tgtVertexId, version)
- edge.op = op
- edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
- edge
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
index f4802c0..3b55ed8 100644
---
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
+++
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -23,8 +23,8 @@ import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core.mysqls.{ServiceColumn, Label, LabelIndex,
LabelMeta}
import org.apache.s2graph.core.storage.StorageDeserializable._
import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable,
SKeyValue, StorageDeserializable}
-import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection,
SourceAndTargetVertexIdPair, SourceVertexId}
-import org.apache.s2graph.core.{S2Graph, S2Edge, SnapshotEdge, S2Vertex}
+import org.apache.s2graph.core.types._
+import org.apache.s2graph.core._
class SnapshotEdgeDeserializable(graph: S2Graph) extends
Deserializable[SnapshotEdge] {
@@ -34,72 +34,76 @@ class SnapshotEdgeDeserializable(graph: S2Graph) extends
Deserializable[Snapshot
(statusCode.toByte, op.toByte)
}
- override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
- _kvs: Seq[T],
- version: String,
- cacheElementOpt:
Option[SnapshotEdge]): SnapshotEdge = {
- val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
- assert(kvs.size == 1)
+ override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
+ cacheElementOpt:
Option[SnapshotEdge]): Option[SnapshotEdge] = {
+ try {
+ val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+ assert(kvs.size == 1)
- val kv = kvs.head
- val label = checkLabel.get
- val schemaVer = label.schemaVersion
- val cellVersion = kv.timestamp
- /** rowKey */
- def parseRowV3(kv: SKeyValue, version: String) = {
+ val kv = kvs.head
+ val version = kv.timestamp
var pos = 0
- val (srcIdAndTgtId, srcIdAndTgtIdLen) =
SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version)
- pos += srcIdAndTgtIdLen
+ val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos,
kv.row.length, HBaseType.DEFAULT_VERSION)
+ pos += srcIdLen
+
+ val isTallSchema = pos + 5 != kv.row.length
+ var tgtVertexId = TargetVertexId(ServiceColumn.Default,
srcVertexId.innerId)
+
+ if (isTallSchema) {
+ val (tgtId, tgtBytesLen) = InnerVal.fromBytes(kv.row, pos,
kv.row.length, HBaseType.DEFAULT_VERSION)
+ tgtVertexId = TargetVertexId(ServiceColumn.Default, tgtId)
+ pos += tgtBytesLen
+ }
+
val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
pos += 4
val (labelIdxSeq, isInverted) =
bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
+ pos += 1
- val rowLen = srcIdAndTgtIdLen + 4 + 1
- (srcIdAndTgtId.srcInnerId, srcIdAndTgtId.tgtInnerId, labelWithDir,
labelIdxSeq, isInverted, rowLen)
+ if (!isInverted) None
+ else {
+ val label = Label.findById(labelWithDir.labelId)
+ val schemaVer = label.schemaVersion
+// val srcVertexId = SourceVertexId(ServiceColumn.Default,
srcIdAndTgtId.srcInnerId)
+// val tgtVertexId = SourceVertexId(ServiceColumn.Default,
tgtId.tgtInnerId)
- }
- val (srcInnerId, tgtInnerId, labelWithDir, _, _, _) = cacheElementOpt.map
{ e =>
- (e.srcVertex.innerId, e.tgtVertex.innerId, e.labelWithDir,
LabelIndex.DefaultSeq, true, 0)
- }.getOrElse(parseRowV3(kv, schemaVer))
+ var pos = 0
+ val (statusCode, op) = statusCodeWithOp(kv.value(pos))
+ pos += 1
+ val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer,
label)
+ val kvsMap = props.toMap
+ val tsInnerVal = kvsMap(LabelMeta.timestamp).innerVal
+ val ts = tsInnerVal.toString.toLong
+ pos = endAt
- val srcVertexId = SourceVertexId(ServiceColumn.Default, srcInnerId)
- val tgtVertexId = SourceVertexId(ServiceColumn.Default, tgtInnerId)
+ val _pendingEdgeOpt =
+ if (pos == kv.value.length) None
+ else {
+ val (pendingEdgeStatusCode, pendingEdgeOp) =
statusCodeWithOp(kv.value(pos))
+ pos += 1
+ // val versionNum = Bytes.toLong(kv.value, pos, 8)
+ // pos += 8
+ val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value,
pos, schemaVer, label)
+ pos = endAt
+ val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
- val (props, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal) = {
- var pos = 0
- val (statusCode, op) = statusCodeWithOp(kv.value(pos))
- pos += 1
- val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer,
label)
- val kvsMap = props.toMap
- val tsInnerVal = kvsMap(LabelMeta.timestamp).innerVal
- val ts = tsInnerVal.toString.toLong
+ val pendingEdge =
+ graph.newEdge(graph.newVertex(srcVertexId, version),
+ graph.newVertex(tgtVertexId, version),
+ label, labelWithDir.dir, pendingEdgeOp,
+ version, pendingEdgeProps.toMap,
+ statusCode = pendingEdgeStatusCode, lockTs = lockTs,
tsInnerValOpt = Option(tsInnerVal))
+ Option(pendingEdge)
+ }
- pos = endAt
- val _pendingEdgeOpt =
- if (pos == kv.value.length) None
- else {
- val (pendingEdgeStatusCode, pendingEdgeOp) =
statusCodeWithOp(kv.value(pos))
- pos += 1
- // val versionNum = Bytes.toLong(kv.value, pos, 8)
- // pos += 8
- val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value,
pos, schemaVer, label)
- pos = endAt
- val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
+ val snapshotEdge = graph.newSnapshotEdge(graph.newVertex(srcVertexId,
ts), graph.newVertex(tgtVertexId, ts),
+ label, labelWithDir.dir, op, version, props.toMap, statusCode =
statusCode,
+ pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt =
Option(tsInnerVal))
- val pendingEdge =
- graph.newEdge(graph.newVertex(srcVertexId, cellVersion),
- graph.newVertex(tgtVertexId, cellVersion),
- label, labelWithDir.dir, pendingEdgeOp,
- cellVersion, pendingEdgeProps.toMap,
- statusCode = pendingEdgeStatusCode, lockTs = lockTs,
tsInnerValOpt = Option(tsInnerVal))
- Option(pendingEdge)
- }
-
- (kvsMap, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal)
+ Option(snapshotEdge)
+ }
+ } catch {
+ case e: Exception => None
}
-
- graph.newSnapshotEdge(graph.newVertex(srcVertexId, ts),
graph.newVertex(tgtVertexId, ts),
- label, labelWithDir.dir, op, cellVersion, props, statusCode = statusCode,
- pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt =
Option(tsInnerVal))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
index d3dec1e..78ac2f7 100644
---
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
+++
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
@@ -23,8 +23,8 @@ import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
import org.apache.s2graph.core.storage.StorageDeserializable._
import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable,
StorageDeserializable}
-import org.apache.s2graph.core.types.TargetVertexId
-import org.apache.s2graph.core.{S2Graph, S2Edge, SnapshotEdge, S2Vertex}
+import org.apache.s2graph.core.types.{LabelWithDirection, HBaseType,
SourceVertexId, TargetVertexId}
+import org.apache.s2graph.core._
class SnapshotEdgeDeserializable(graph: S2Graph) extends
Deserializable[SnapshotEdge] {
@@ -34,58 +34,67 @@ class SnapshotEdgeDeserializable(graph: S2Graph) extends
Deserializable[Snapshot
(statusCode.toByte, op.toByte)
}
- override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
- _kvs: Seq[T],
- version: String,
- cacheElementOpt:
Option[SnapshotEdge]): SnapshotEdge = {
- val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
- assert(kvs.size == 1)
+ override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
+ cacheElementOpt:
Option[SnapshotEdge]): Option[SnapshotEdge] = {
+ try {
+ val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+ assert(kvs.size == 1)
- val kv = kvs.head
- val label = checkLabel.get
- val schemaVer = label.schemaVersion
- val cellVersion = kv.timestamp
-
- val (srcVertexId, labelWithDir, _, _, _) = cacheElementOpt.map { e =>
- (e.srcVertex.id, e.labelWithDir, LabelIndex.DefaultSeq, true, 0)
- }.getOrElse(parseRow(kv, schemaVer))
-
- val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal)
= {
- val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0,
kv.qualifier.length, schemaVer)
+ val kv = kvs.head
+ val version = kv.timestamp
var pos = 0
- val (statusCode, op) = statusCodeWithOp(kv.value(pos))
+ val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos,
kv.row.length, HBaseType.DEFAULT_VERSION)
+ pos += srcIdLen
+ val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+ pos += 4
+ val (labelIdxSeq, isInverted) =
bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
pos += 1
- val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer,
label)
- val kvsMap = props.toMap
- val tsInnerVal = kvsMap(LabelMeta.timestamp).innerVal
- val ts = tsInnerVal.toString.toLong
- pos = endAt
- val _pendingEdgeOpt =
- if (pos == kv.value.length) None
- else {
- val (pendingEdgeStatusCode, pendingEdgeOp) =
statusCodeWithOp(kv.value(pos))
- pos += 1
- // val versionNum = Bytes.toLong(kv.value, pos, 8)
- // pos += 8
- val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value,
pos, schemaVer, label)
- pos = endAt
- val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
+ if (!isInverted) None
+ else {
+ val label = Label.findById(labelWithDir.labelId)
+ val schemaVer = label.schemaVersion
+ val srcVertex = graph.newVertex(srcVertexId, version)
- val pendingEdge =
- graph.newEdge(graph.newVertex(srcVertexId, cellVersion),
- graph.newVertex(tgtVertexId, cellVersion),
- label, labelWithDir.dir, pendingEdgeOp,
- cellVersion, pendingEdgeProps.toMap,
- statusCode = pendingEdgeStatusCode, lockTs = lockTs,
tsInnerValOpt = Option(tsInnerVal))
- Option(pendingEdge)
- }
+ val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0,
kv.qualifier.length, schemaVer)
- (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal)
- }
+ var pos = 0
+ val (statusCode, op) = statusCodeWithOp(kv.value(pos))
+ pos += 1
+ val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer,
label)
+ val kvsMap = props.toMap
+ val tsInnerVal = kvsMap(LabelMeta.timestamp).innerVal
+ val ts = tsInnerVal.toString.toLong
+ pos = endAt
+
+ val _pendingEdgeOpt =
+ if (pos == kv.value.length) None
+ else {
+ val (pendingEdgeStatusCode, pendingEdgeOp) =
statusCodeWithOp(kv.value(pos))
+ pos += 1
+ // val versionNum = Bytes.toLong(kv.value, pos, 8)
+ // pos += 8
+ val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value,
pos, schemaVer, label)
+ pos = endAt
+ val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
- graph.newSnapshotEdge(graph.newVertex(srcVertexId, ts),
graph.newVertex(tgtVertexId, ts),
- label, labelWithDir.dir, op, cellVersion, props, statusCode = statusCode,
- pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt =
Option(tsInnerVal))
+ val pendingEdge =
+ graph.newEdge(graph.newVertex(srcVertexId, version),
+ graph.newVertex(tgtVertexId, version),
+ label, labelWithDir.dir, pendingEdgeOp,
+ version, pendingEdgeProps.toMap,
+ statusCode = pendingEdgeStatusCode, lockTs = lockTs,
tsInnerValOpt = Option(tsInnerVal))
+ Option(pendingEdge)
+ }
+
+ val snapshotEdge = graph.newSnapshotEdge(graph.newVertex(srcVertexId,
ts), graph.newVertex(tgtVertexId, ts),
+ label, labelWithDir.dir, op, version, props.toMap, statusCode =
statusCode,
+ pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt =
Option(tsInnerVal))
+
+ Option(snapshotEdge)
+ }
+ } catch {
+ case e: Exception => None
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
----------------------------------------------------------------------
diff --git
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
index ee93505..b4a00e6 100644
---
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
+++
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
@@ -22,49 +22,51 @@ package org.apache.s2graph.core.storage.serde.vertex
import org.apache.s2graph.core.mysqls.{ColumnMeta, Label}
import org.apache.s2graph.core.storage.StorageDeserializable._
import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable}
-import org.apache.s2graph.core.types.{InnerVal, InnerValLike, VertexId}
+import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLike,
VertexId}
import org.apache.s2graph.core.{S2Graph, QueryParam, S2Vertex}
import scala.collection.mutable.ListBuffer
class VertexDeserializable(graph: S2Graph,
bytesToInt: (Array[Byte], Int) => Int = bytesToInt)
extends Deserializable[S2Vertex] {
- def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
- _kvs: Seq[T],
- version: String,
- cacheElementOpt: Option[S2Vertex]):
S2Vertex = {
+ def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
+ cacheElementOpt: Option[S2Vertex]):
Option[S2Vertex] = {
+ try {
+ val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
- val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+ val kv = kvs.head
+ val version = HBaseType.DEFAULT_VERSION
+ val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version)
- val kv = kvs.head
- val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version)
+ var maxTs = Long.MinValue
+ val propsMap = new collection.mutable.HashMap[ColumnMeta, InnerValLike]
+ val belongLabelIds = new ListBuffer[Int]
- var maxTs = Long.MinValue
- val propsMap = new collection.mutable.HashMap[ColumnMeta, InnerValLike]
- val belongLabelIds = new ListBuffer[Int]
+ for {
+ kv <- kvs
+ } {
+ val propKey =
+ if (kv.qualifier.length == 1) kv.qualifier.head.toInt
+ else bytesToInt(kv.qualifier, 0)
- for {
- kv <- kvs
- } {
- val propKey =
- if (kv.qualifier.length == 1) kv.qualifier.head.toInt
- else bytesToInt(kv.qualifier, 0)
+ val ts = kv.timestamp
+ if (ts > maxTs) maxTs = ts
- val ts = kv.timestamp
- if (ts > maxTs) maxTs = ts
-
- if (S2Vertex.isLabelId(propKey)) {
- belongLabelIds += S2Vertex.toLabelId(propKey)
- } else {
- val v = kv.value
- val (value, _) = InnerVal.fromBytes(v, 0, v.length, version)
- val columnMeta = vertexId.column.metasMap(propKey)
- propsMap += (columnMeta -> value)
+ if (S2Vertex.isLabelId(propKey)) {
+ belongLabelIds += S2Vertex.toLabelId(propKey)
+ } else {
+ val v = kv.value
+ val (value, _) = InnerVal.fromBytes(v, 0, v.length, version)
+ val columnMeta = vertexId.column.metasMap(propKey)
+ propsMap += (columnMeta -> value)
+ }
}
+ assert(maxTs != Long.MinValue)
+ val vertex = graph.newVertex(vertexId, maxTs, S2Vertex.EmptyProps,
belongLabelIds = belongLabelIds)
+ S2Vertex.fillPropsWithTs(vertex, propsMap.toMap)
+ Option(vertex)
+ } catch {
+ case e: Exception => None
}
- assert(maxTs != Long.MinValue)
- val vertex = graph.newVertex(vertexId, maxTs, S2Vertex.EmptyProps,
belongLabelIds = belongLabelIds)
- S2Vertex.fillPropsWithTs(vertex, propsMap.toMap)
- vertex
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
----------------------------------------------------------------------
diff --git
a/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
b/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
index b885bc6..77951c0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
@@ -26,7 +26,8 @@ object HBaseType {
val VERSION4 = "v4"
val VERSION3 = "v3"
val VERSION2 = "v2"
- val VERSION1 = "v1"
+ val ValidVersions = List(VERSION4, VERSION3, VERSION2)
+// val VERSION1 = "v1"
// val DEFAULT_VERSION = VERSION2
val DEFAULT_VERSION = VERSION3
// val EMPTY_SEQ_BYTE = Byte.MaxValue
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
----------------------------------------------------------------------
diff --git
a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
index ea7aa41..c37728c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
@@ -84,7 +84,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
isVertexId: Boolean): (InnerValLike, Int) = {
version match {
case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal.fromBytes(bytes,
offset, len, version, isVertexId)
- case VERSION1 => v1.InnerVal.fromBytes(bytes, offset, len, version,
isVertexId)
+// case VERSION1 => v1.InnerVal.fromBytes(bytes, offset, len, version,
isVertexId)
case _ => throw notSupportedEx(version)
}
}
@@ -92,7 +92,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
def withLong(l: Long, version: String): InnerValLike = {
version match {
case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(l))
- case VERSION1 => v1.InnerVal(Some(l), None, None)
+// case VERSION1 => v1.InnerVal(Some(l), None, None)
case _ => throw notSupportedEx(version)
}
}
@@ -100,7 +100,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
def withInt(i: Int, version: String): InnerValLike = {
version match {
case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(i))
- case VERSION1 => v1.InnerVal(Some(i.toLong), None, None)
+// case VERSION1 => v1.InnerVal(Some(i.toLong), None, None)
case _ => throw notSupportedEx(version)
}
}
@@ -108,7 +108,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
def withFloat(f: Float, version: String): InnerValLike = {
version match {
case VERSION2 | VERSION3 | VERSION4 =>
v2.InnerVal(BigDecimal(f.toDouble))
- case VERSION1 => v1.InnerVal(Some(f.toLong), None, None)
+// case VERSION1 => v1.InnerVal(Some(f.toLong), None, None)
case _ => throw notSupportedEx(version)
}
}
@@ -116,7 +116,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
def withDouble(d: Double, version: String): InnerValLike = {
version match {
case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(d))
- case VERSION1 => v1.InnerVal(Some(d.toLong), None, None)
+// case VERSION1 => v1.InnerVal(Some(d.toLong), None, None)
case _ => throw notSupportedEx(version)
}
}
@@ -124,7 +124,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
def withNumber(num: BigDecimal, version: String): InnerValLike = {
version match {
case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(num)
- case VERSION1 => v1.InnerVal(Some(num.toLong), None, None)
+// case VERSION1 => v1.InnerVal(Some(num.toLong), None, None)
case _ => throw notSupportedEx(version)
}
}
@@ -132,7 +132,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
def withBoolean(b: Boolean, version: String): InnerValLike = {
version match {
case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(b)
- case VERSION1 => v1.InnerVal(None, None, Some(b))
+// case VERSION1 => v1.InnerVal(None, None, Some(b))
case _ => throw notSupportedEx(version)
}
}
@@ -147,7 +147,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
def withStr(s: String, version: String): InnerValLike = {
version match {
case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(s)
- case VERSION1 => v1.InnerVal(None, Some(s), None)
+// case VERSION1 => v1.InnerVal(None, Some(s), None)
case _ => throw notSupportedEx(version)
}
}
@@ -162,40 +162,40 @@ object InnerVal extends HBaseDeserializableWithIsVertexId
{
// }
/** nasty implementation for backward compatability */
- def convertVersion(innerVal: InnerValLike, dataType: String, toVersion:
String): InnerValLike = {
- val ret = toVersion match {
- case VERSION2 | VERSION3 | VERSION4 =>
- if (innerVal.isInstanceOf[v1.InnerVal]) {
- val obj = innerVal.asInstanceOf[v1.InnerVal]
- obj.valueType match {
- case "long" => InnerVal.withLong(obj.longV.get, toVersion)
- case "string" => InnerVal.withStr(obj.strV.get, toVersion)
- case "boolean" => InnerVal.withBoolean(obj.boolV.get, toVersion)
- case _ => throw new Exception(s"InnerVal should be
[long/integeer/short/byte/string/boolean]")
- }
- } else {
- innerVal
- }
- case VERSION1 =>
- if (innerVal.isInstanceOf[v2.InnerVal]) {
- val obj = innerVal.asInstanceOf[v2.InnerVal]
- obj.value match {
- case str: String => InnerVal.withStr(str, toVersion)
- case b: Boolean => InnerVal.withBoolean(b, toVersion)
- case n: BigDecimal => InnerVal.withNumber(n, toVersion)
- case n: Long => InnerVal.withNumber(n, toVersion)
- case n: Double => InnerVal.withNumber(n, toVersion)
- case n: Int => InnerVal.withNumber(n, toVersion)
- case _ => throw notSupportedEx(s"v2 to v1: $obj -> $toVersion")
- }
- } else {
- innerVal
- }
- case _ => throw notSupportedEx(toVersion)
- }
-// logger.debug(s"convertVersion: $innerVal, $dataType, $toVersion, $ret,
${innerVal.bytes.toList}, ${ret.bytes.toList}")
- ret
- }
+// def convertVersion(innerVal: InnerValLike, dataType: String, toVersion:
String): InnerValLike = {
+// val ret = toVersion match {
+// case VERSION2 | VERSION3 | VERSION4 =>
+// if (innerVal.isInstanceOf[v1.InnerVal]) {
+// val obj = innerVal.asInstanceOf[v1.InnerVal]
+// obj.valueType match {
+// case "long" => InnerVal.withLong(obj.longV.get, toVersion)
+// case "string" => InnerVal.withStr(obj.strV.get, toVersion)
+// case "boolean" => InnerVal.withBoolean(obj.boolV.get, toVersion)
+// case _ => throw new Exception(s"InnerVal should be
[long/integeer/short/byte/string/boolean]")
+// }
+// } else {
+// innerVal
+// }
+//// case VERSION1 =>
+//// if (innerVal.isInstanceOf[v2.InnerVal]) {
+//// val obj = innerVal.asInstanceOf[v2.InnerVal]
+//// obj.value match {
+//// case str: String => InnerVal.withStr(str, toVersion)
+//// case b: Boolean => InnerVal.withBoolean(b, toVersion)
+//// case n: BigDecimal => InnerVal.withNumber(n, toVersion)
+//// case n: Long => InnerVal.withNumber(n, toVersion)
+//// case n: Double => InnerVal.withNumber(n, toVersion)
+//// case n: Int => InnerVal.withNumber(n, toVersion)
+//// case _ => throw notSupportedEx(s"v2 to v1: $obj -> $toVersion")
+//// }
+//// } else {
+//// innerVal
+//// }
+// case _ => throw notSupportedEx(toVersion)
+// }
+//// logger.debug(s"convertVersion: $innerVal, $dataType, $toVersion, $ret,
${innerVal.bytes.toList}, ${ret.bytes.toList}")
+// ret
+// }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
----------------------------------------------------------------------
diff --git
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
index d280570..019ef67 100644
---
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
+++
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
@@ -298,7 +298,7 @@ trait IntegrateCommon extends FunSuite with Matchers with
BeforeAndAfterAll {
],
"consistencyLevel": "strong",
"isDirected": true,
- "schemaVersion": "v1",
+ "schemaVersion": "v2",
"compressionAlgorithm": "gz"
}"""
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
b/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
index bab6e03..7228c5d 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
@@ -30,7 +30,7 @@ class JsonParserTest extends FunSuite with Matchers with
TestCommon {
import types.HBaseType._
val innerValsPerVersion = for {
- version <- List(VERSION2, VERSION1)
+ version <- ValidVersions
} yield {
val innerVals = List(
(InnerVal.withStr("ABC123", version), STRING),
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala
b/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala
index 0f8f833..3f6cd2a 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala
@@ -44,15 +44,15 @@ trait TestCommon {
def lessThanEqual(x: Array[Byte], y: Array[Byte]) = Bytes.compareTo(x, y) <= 0
/** */
- import HBaseType.{VERSION1, VERSION2}
- private val tsValSmall = InnerVal.withLong(ts, VERSION1)
- private val tsValLarge = InnerVal.withLong(ts + 1, VERSION1)
- private val boolValSmall = InnerVal.withBoolean(false, VERSION1)
- private val boolValLarge = InnerVal.withBoolean(true, VERSION1)
- private val doubleValSmall = InnerVal.withDouble(-0.1, VERSION1)
- private val doubleValLarge = InnerVal.withDouble(0.1, VERSION1)
+ import HBaseType._
+ private val tsValSmall = InnerVal.withLong(ts, DEFAULT_VERSION)
+ private val tsValLarge = InnerVal.withLong(ts + 1, DEFAULT_VERSION)
+ private val boolValSmall = InnerVal.withBoolean(false, DEFAULT_VERSION)
+ private val boolValLarge = InnerVal.withBoolean(true, DEFAULT_VERSION)
+ private val doubleValSmall = InnerVal.withDouble(-0.1, DEFAULT_VERSION)
+ private val doubleValLarge = InnerVal.withDouble(0.1, DEFAULT_VERSION)
private val toSeq = LabelMeta.toSeq.toInt
- private val toVal = InnerVal.withLong(Long.MinValue, VERSION1)
+ private val toVal = InnerVal.withLong(Long.MinValue, DEFAULT_VERSION)
private val tsValSmallV2 = InnerVal.withLong(ts, VERSION2)
@@ -65,19 +65,19 @@ trait TestCommon {
val intVals = (Int.MinValue until Int.MinValue + 10) ++ (-129 to -126) ++
(-1 to 1) ++ (126 to 129) ++
(Int.MaxValue - 10 until Int.MaxValue)
- val intInnerVals = intVals.map { v => InnerVal.withNumber(BigDecimal(v),
VERSION1) }
+ val intInnerVals = intVals.map { v => InnerVal.withNumber(BigDecimal(v),
DEFAULT_VERSION) }
val intInnerValsV2 = intVals.map { v => InnerVal.withNumber(BigDecimal(v),
VERSION2) }
val stringVals = List("abc", "abd", "ac", "aca", "b")
- val stringInnerVals = stringVals.map { s => InnerVal.withStr(s, VERSION1)}
+ val stringInnerVals = stringVals.map { s => InnerVal.withStr(s,
DEFAULT_VERSION)}
val stringInnerValsV2 = stringVals.map { s => InnerVal.withStr(s, VERSION2)}
val numVals = (Long.MinValue until Long.MinValue + 10).map(BigDecimal(_)) ++
(Int.MinValue until Int.MinValue + 10).map(BigDecimal(_)) ++
(Int.MaxValue - 10 until Int.MaxValue).map(BigDecimal(_)) ++
(Long.MaxValue - 10 until Long.MaxValue).map(BigDecimal(_))
- val numInnerVals = numVals.map { n => InnerVal.withLong(n.toLong, VERSION1)}
+ val numInnerVals = numVals.map { n => InnerVal.withLong(n.toLong,
DEFAULT_VERSION)}
val numInnerValsV2 = numVals.map { n => InnerVal.withNumber(n, VERSION2)}
val doubleStep = Double.MaxValue / 5
@@ -86,16 +86,16 @@ trait TestCommon {
(-128.0 until 128.0 by 1.2).map(BigDecimal(_)) ++
(129.0 until 142.0 by 1.1).map(BigDecimal(_)) ++
(doubleStep until Double.MaxValue by doubleStep).map(BigDecimal(_))
- val doubleInnerVals = doubleVals.map { d => InnerVal.withDouble(d.toDouble,
VERSION1)}
+ val doubleInnerVals = doubleVals.map { d => InnerVal.withDouble(d.toDouble,
DEFAULT_VERSION)}
val doubleInnerValsV2 = doubleVals.map { d =>
InnerVal.withDouble(d.toDouble, VERSION2)}
/** version 1 string order is broken */
val idxPropsLs = Seq(
- Seq((0 -> tsValSmall), (1 -> boolValSmall), (2 -> InnerVal.withStr("ac",
VERSION1)),(toSeq -> toVal)),
- Seq((0 -> tsValSmall), (1 -> boolValSmall), (2 -> InnerVal.withStr("ab",
VERSION1)), (toSeq -> toVal)),
- Seq((0 -> tsValSmall), (1 -> boolValSmall), (2-> InnerVal.withStr("b",
VERSION1)), (toSeq -> toVal)),
- Seq((0 -> tsValSmall), (1 -> boolValLarge), (2 -> InnerVal.withStr("b",
VERSION1)), (toSeq -> toVal)),
- Seq((0 -> tsValLarge), (1 -> boolValSmall), (2 -> InnerVal.withStr("a",
VERSION1)), (toSeq -> toVal))
+ Seq((0 -> tsValSmall), (1 -> boolValSmall), (2 -> InnerVal.withStr("ac",
DEFAULT_VERSION)),(toSeq -> toVal)),
+ Seq((0 -> tsValSmall), (1 -> boolValSmall), (2 -> InnerVal.withStr("ab",
DEFAULT_VERSION)), (toSeq -> toVal)),
+ Seq((0 -> tsValSmall), (1 -> boolValSmall), (2-> InnerVal.withStr("b",
DEFAULT_VERSION)), (toSeq -> toVal)),
+ Seq((0 -> tsValSmall), (1 -> boolValLarge), (2 -> InnerVal.withStr("b",
DEFAULT_VERSION)), (toSeq -> toVal)),
+ Seq((0 -> tsValLarge), (1 -> boolValSmall), (2 -> InnerVal.withStr("a",
DEFAULT_VERSION)), (toSeq -> toVal))
).map(seq => seq.map(t => t._1.toByte -> t._2 ))
val idxPropsLsV2 = Seq(
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
----------------------------------------------------------------------
diff --git
a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
index 1997354..4614bed 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
@@ -138,7 +138,7 @@ trait TestCommonWithModels {
implicit val session = AutoSession
management.createLabel(labelName, serviceName, columnName, columnType,
serviceName, columnName, columnType,
- isDirected = true, serviceName, testIdxProps, testProps,
consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4", None)
+ isDirected = true, serviceName, testIdxProps, testProps,
consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4", None)
management.createLabel(labelNameV2, serviceNameV2, columnNameV2,
columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
isDirected = true, serviceNameV2, testIdxProps, testProps,
consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4", None)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
----------------------------------------------------------------------
diff --git
a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
index 25dd0e4..ad9299c 100644
---
a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
+++
b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
@@ -32,7 +32,7 @@ import scala.util.{Random, Try}
class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels
{
initTests()
- import HBaseType.{VERSION1, VERSION2}
+ import HBaseType._
val ts = System.currentTimeMillis()
val dummyTs = LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts,
label.schemaVersion)
@@ -60,7 +60,7 @@ class WhereParserTest extends FunSuite with Matchers with
TestCommonWithModels {
}
def ids = for {
- version <- Seq(VERSION1, VERSION2)
+ version <- ValidVersions
} yield {
val srcId = SourceVertexId(ServiceColumn.Default, InnerVal.withLong(1,
version))
val tgtId =
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
----------------------------------------------------------------------
diff --git
a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
index 5409d61..a5c974e 100644
---
a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
+++
b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
@@ -46,8 +46,7 @@ class IndexEdgeTest extends FunSuite with Matchers with
TestCommonWithModels {
val labelOpt = Option(l)
val edge = graph.newEdge(vertex, tgtVertex, l, labelWithDir.dir, 0, ts,
props, tsInnerValOpt = Option(InnerVal.withLong(ts, l.schemaVersion)))
val indexEdge = edge.edgesWithIndex.find(_.labelIndexSeq ==
LabelIndex.DefaultSeq).head
- val _indexEdgeOpt =
graph.getStorage(l).indexEdgeDeserializer(l.schemaVersion).fromKeyValues(labelOpt,
- graph.getStorage(l).indexEdgeSerializer(indexEdge).toKeyValues,
l.schemaVersion, None)
+ val _indexEdgeOpt =
graph.getStorage(l).indexEdgeDeserializer(l.schemaVersion).fromKeyValues(graph.getStorage(l).indexEdgeSerializer(indexEdge).toKeyValues,
None)
_indexEdgeOpt should not be empty
edge == _indexEdgeOpt.get should be(true)