add S2EdgePropertyHelper.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/d3a2e75d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/d3a2e75d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/d3a2e75d

Branch: refs/heads/master
Commit: d3a2e75dfd40de0c2dc4878f14b30cfd179344c9
Parents: 2b5df1d
Author: DO YUNG YOON <[email protected]>
Authored: Tue Nov 7 08:31:45 2017 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Tue Nov 7 09:00:19 2017 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/core/PostProcess.scala   | 28 +++----
 .../org/apache/s2graph/core/QueryResult.scala   |  4 +-
 .../scala/org/apache/s2graph/core/S2Edge.scala  | 36 ++-------
 .../org/apache/s2graph/core/S2EdgeBuilder.scala | 11 +--
 .../org/apache/s2graph/core/S2EdgeLike.scala    | 81 ++++++++------------
 .../s2graph/core/S2EdgePropertyHelper.scala     | 70 +++++++++++++++++
 .../scala/org/apache/s2graph/core/S2Graph.scala |  6 +-
 .../org/apache/s2graph/core/S2VertexLike.scala  |  2 +-
 .../apache/s2graph/core/TraversalHelper.scala   | 16 ++--
 .../s2graph/core/parsers/WhereParser.scala      |  2 +-
 .../s2graph/core/storage/StorageReadable.scala  |  2 +-
 .../hbase/AsynchbaseStorageReadable.scala       |  2 +-
 .../tall/SnapshotEdgeSerializable.scala         |  4 +-
 .../wide/SnapshotEdgeSerializable.scala         |  4 +-
 .../rest/play/controllers/EdgeController.scala  |  2 +-
 15 files changed, 146 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
index 2d2e183..5118600 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
@@ -84,17 +84,17 @@ object PostProcess {
     val score = edgeWithScore.score
     val label = edgeWithScore.label
     if (isDegree) {
-      builder += ("from" -> anyValToJsValue(s2Edge.srcId).get)
+      builder += ("from" -> anyValToJsValue(s2Edge.srcVertex.innerIdVal).get)
       builder += ("label" -> anyValToJsValue(label.label).get)
-      builder += ("direction" -> anyValToJsValue(s2Edge.direction).get)
+      builder += ("direction" -> anyValToJsValue(s2Edge.getDirection()).get)
       builder += (LabelMeta.degree.name -> 
anyValToJsValue(s2Edge.propertyValueInner(LabelMeta.degree).innerVal.value).get)
       JsObject(builder)
     } else {
       if (queryOption.withScore) builder += ("score" -> 
anyValToJsValue(score).get)
 
       if (queryOption.selectColumns.isEmpty) {
-        builder += ("from" -> anyValToJsValue(s2Edge.srcId).get)
-        builder += ("to" -> anyValToJsValue(s2Edge.tgtId).get)
+        builder += ("from" -> anyValToJsValue(s2Edge.srcVertex.innerIdVal).get)
+        builder += ("to" -> anyValToJsValue(s2Edge.tgtVertex.innerIdVal).get)
         builder += ("label" -> anyValToJsValue(label.label).get)
 
         val innerProps = ArrayBuffer.empty[(String, JsValue)]
@@ -107,23 +107,23 @@ object PostProcess {
 
 
         builder += ("props" -> JsObject(innerProps))
-        builder += ("direction" -> anyValToJsValue(s2Edge.direction).get)
-        builder += ("timestamp" -> anyValToJsValue(s2Edge.tsInnerVal).get)
-        builder += ("_timestamp" -> anyValToJsValue(s2Edge.tsInnerVal).get) // 
backward compatibility
+        builder += ("direction" -> anyValToJsValue(s2Edge.getDirection()).get)
+        builder += ("timestamp" -> 
anyValToJsValue(s2Edge.getTsInnerValValue()).get)
+        builder += ("_timestamp" -> 
anyValToJsValue(s2Edge.getTsInnerValValue()).get) // backward compatibility
         if (parents != JsNull) builder += ("parents" -> parents)
         //          Json.toJson(builder.result())
         JsObject(builder)
       } else {
         queryOption.selectColumnsMap.foreach { case (columnName, _) =>
           columnName match {
-            case "from" => builder += ("from" -> 
anyValToJsValue(s2Edge.srcId).get)
-            case "_from" => builder += ("_from" -> 
anyValToJsValue(s2Edge.srcId).get)
-            case "to" => builder += ("to" -> anyValToJsValue(s2Edge.tgtId).get)
-            case "_to" => builder += ("_to" -> 
anyValToJsValue(s2Edge.tgtId).get)
+            case "from" => builder += ("from" -> 
anyValToJsValue(s2Edge.srcVertex.innerIdVal).get)
+            case "_from" => builder += ("_from" -> 
anyValToJsValue(s2Edge.srcVertex.innerIdVal).get)
+            case "to" => builder += ("to" -> 
anyValToJsValue(s2Edge.tgtVertex.innerIdVal).get)
+            case "_to" => builder += ("_to" -> 
anyValToJsValue(s2Edge.tgtVertex.innerIdVal).get)
             case "label" => builder += ("label" -> 
anyValToJsValue(label.label).get)
-            case "direction" => builder += ("direction" -> 
anyValToJsValue(s2Edge.direction).get)
-            case "timestamp" => builder += ("timestamp" -> 
anyValToJsValue(s2Edge.tsInnerVal).get)
-            case "_timestamp" => builder += ("_timestamp" -> 
anyValToJsValue(s2Edge.tsInnerVal).get)
+            case "direction" => builder += ("direction" -> 
anyValToJsValue(s2Edge.getDirection()).get)
+            case "timestamp" => builder += ("timestamp" -> 
anyValToJsValue(s2Edge.getTsInnerValValue()).get)
+            case "_timestamp" => builder += ("_timestamp" -> 
anyValToJsValue(s2Edge.getTsInnerValValue()).get)
             case _ => // should not happen
 
           }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index 9bd3cdb..b654e71 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -237,7 +237,7 @@ object StepResult {
 
           //          val newOrderByValues = 
updateScoreOnOrderByValues(globalQueryOption.scoreFieldIdx, t.orderByValues, 
newScore)
           val newOrderByValues =
-            if (globalQueryOption.orderByKeys.isEmpty) (newScore, 
t.edge.tsInnerVal, None, None)
+            if (globalQueryOption.orderByKeys.isEmpty) (newScore, 
t.edge.getTsInnerValValue(), None, None)
             else toTuple4(newT.toValues(globalQueryOption.orderByKeys))
 
           val newGroupByValues = newT.toValues(globalQueryOption.groupBy.keys)
@@ -262,7 +262,7 @@ object StepResult {
 //            val newOrderByValues = 
updateScoreOnOrderByValues(globalQueryOption.scoreFieldIdx, t.orderByValues, 
newScore)
 
             val newOrderByValues =
-              if (globalQueryOption.orderByKeys.isEmpty) (newScore, 
t.edge.tsInnerVal, None, None)
+              if (globalQueryOption.orderByKeys.isEmpty) (newScore, 
t.edge.getTsInnerValValue(), None, None)
               else toTuple4(newT.toValues(globalQueryOption.orderByKeys))
 
             val newGroupByValues = 
newT.toValues(globalQueryOption.groupBy.keys)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 03678c8..9f5093c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -592,7 +592,7 @@ object S2Edge {
       for {
         (requestEdge, func) <- requestWithFuncs
       } {
-        val (_newPropsWithTs, _) = func((prevPropsWithTs, 
propsToState(requestEdge.propsWithTs), requestEdge.ts, requestEdge.schemaVer))
+        val (_newPropsWithTs, _) = func((prevPropsWithTs, 
propsToState(requestEdge.propsWithTs), requestEdge.ts, 
requestEdge.innerLabel.schemaVersion))
         prevPropsWithTs = _newPropsWithTs
         //        
logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n")
       }
@@ -814,8 +814,8 @@ object S2Edge {
     if (vertex.id.column == ServiceColumn.Default) defaultServiceColumn else 
vertex.id.column
 
   def srcForVertex(e: S2EdgeLike): S2VertexLike = {
-    val belongLabelIds = Seq(e.labelWithDir.labelId)
-    if (e.labelWithDir.dir == GraphUtil.directions("in")) {
+    val belongLabelIds = Seq(e.getLabelId())
+    if (e.getDir() == GraphUtil.directions("in")) {
       val tgtColumn = getServiceColumn(e.tgtVertex, e.innerLabel.tgtColumn)
       e.innerGraph.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), 
e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds)
     } else {
@@ -825,8 +825,8 @@ object S2Edge {
   }
 
   def tgtForVertex(e: S2EdgeLike): S2VertexLike = {
-    val belongLabelIds = Seq(e.labelWithDir.labelId)
-    if (e.labelWithDir.dir == GraphUtil.directions("in")) {
+    val belongLabelIds = Seq(e.getLabelId())
+    if (e.getDir() == GraphUtil.directions("in")) {
       val srcColumn = getServiceColumn(e.srcVertex, e.innerLabel.srcColumn)
       e.innerGraph.newVertex(VertexId(srcColumn, e.srcVertex.innerId), 
e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds)
     } else {
@@ -835,28 +835,6 @@ object S2Edge {
     }
   }
 
-  def updatePropsWithTs(e: S2EdgeLike, others: Props = S2Edge.EmptyProps): 
Props = {
-    val emptyProp = S2Edge.EmptyProps
-
-    e.getPropsWithTs().forEach(new BiConsumer[String, S2Property[_]] {
-      override def accept(key: String, value: S2Property[_]): Unit = 
emptyProp.put(key, value)
-    })
-
-    others.forEach(new BiConsumer[String, S2Property[_]] {
-      override def accept(key: String, value: S2Property[_]): Unit = 
emptyProp.put(key, value)
-    })
-
-    emptyProp
-  }
-
-  def propertyValue(e: S2EdgeLike, key: String): Option[InnerValLikeWithTs] = {
-    key match {
-      case "from" | "_from" => Option(InnerValLikeWithTs(e.srcVertex.innerId, 
e.ts))
-      case "to" | "_to" => Option(InnerValLikeWithTs(e.tgtVertex.innerId, 
e.ts))
-      case "label" => 
Option(InnerValLikeWithTs(InnerVal.withStr(e.innerLabel.label, 
e.innerLabel.schemaVersion), e.ts))
-      case "direction" => 
Option(InnerValLikeWithTs(InnerVal.withStr(e.direction, 
e.innerLabel.schemaVersion), e.ts))
-      case _ =>
-        e.innerLabel.metaPropsInvMap.get(key).map(labelMeta => 
e.propertyValueInner(labelMeta))
-    }
-  }
+  def serializePropsWithTs(edge: S2EdgeLike): Array[Byte] =
+    
HBaseSerializable.propsToKeyValuesWithTs(edge.getPropsWithTs().asScala.map(kv 
=> kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
index ea9598e..2ea1504 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
@@ -22,7 +22,7 @@ class S2EdgeBuilder(edge: S2EdgeLike) {
 
   def propsPlusTsValid = edge.getPropsWithTs().asScala.filter(kv => 
LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava
 
-  def labelOrders = LabelIndex.findByLabelIdAll(edge.labelWithDir.labelId)
+  def labelOrders = LabelIndex.findByLabelIdAll(edge.getLabelId())
 
   def edgesWithIndex = for (labelOrder <- labelOrders) yield {
     IndexEdge(edge.innerGraph, edge.srcVertex, edge.tgtVertex, 
edge.innerLabel, edge.getDir(), edge.getOp(),
@@ -35,7 +35,7 @@ class S2EdgeBuilder(edge: S2EdgeLike) {
   }
 
   def relatedEdges: Seq[S2EdgeLike] = {
-    if (edge.labelWithDir.isDirected) {
+    if (edge.isDirected()) {
       val skipReverse = 
edge.innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
       if (skipReverse) Seq(edge) else Seq(edge, duplicateEdge)
     } else {
@@ -93,11 +93,4 @@ class S2EdgeBuilder(edge: S2EdgeLike) {
     else
       EdgeId(VertexId(tgtColumn, edge.tgtVertex.id.innerId), 
VertexId(srcColumn, edge.srcVertex.id.innerId), edge.label(), "out", timestamp)
   }
-
-  def propertyInner[V](key: String, value: V, ts: Long): Property[V] = {
-    val labelMeta = edge.innerLabel.metaPropsInvMap.getOrElse(key, throw new 
RuntimeException(s"$key is not configured on Edge."))
-    val newProp = new S2Property[V](edge, labelMeta, key, value, ts)
-    edge.getPropsWithTs().put(key, newProp)
-    newProp
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
index 9963be7..33e7e83 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
@@ -19,6 +19,7 @@ trait S2EdgeLike extends Edge with GraphElement {
 
   val builder: S2EdgeBuilder = new S2EdgeBuilder(this)
 
+
   val innerGraph: S2Graph
   val srcVertex: S2VertexLike
   var tgtVertex: S2VertexLike
@@ -35,20 +36,35 @@ trait S2EdgeLike extends Edge with GraphElement {
   val lockTs: Option[Long] = None
 //  var tsInnerValOpt: Option[InnerValLike] = None
 
-  lazy val labelWithDir = LabelWithDirection(innerLabel.id.get, dir)
-  lazy val schemaVer = innerLabel.schemaVersion
   lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.value match 
{
     case b: BigDecimal => b.longValue()
     case l: Long => l
     case i: Int => i.toLong
     case _ => throw new RuntimeException("ts should be in 
[BigDecimal/Long/Int].")
   }
-  lazy val operation = GraphUtil.fromOp(op)
-  lazy val tsInnerVal = tsInnerValOpt.get.value
-  lazy val srcId = srcVertex.innerIdVal
-  lazy val tgtId = tgtVertex.innerIdVal
-  lazy val labelName = innerLabel.label
-  lazy val direction = GraphUtil.fromDirection(dir)
+
+  private lazy val operation = GraphUtil.fromOp(op)
+  private lazy val direction = GraphUtil.fromDirection(dir)
+  private lazy val tsInnerVal = tsInnerValOpt.get.value
+
+  def graph(): Graph = innerGraph
+
+  lazy val edgeId: EdgeId = builder.edgeId
+
+  def id(): AnyRef = edgeId
+
+  def label(): String = innerLabel.label
+
+  def getLabelId(): Int = innerLabel.id.get
+
+  def getDirection(): String = direction
+
+  def getOperation(): String = operation
+
+  def getTsInnerValValue(): Any = tsInnerVal
+
+  def isDirected(): Boolean =
+    getDir() == 0 || getDir() == 1
 
   def getTs(): Long = ts
   def getOriginalEdgeOpt(): Option[S2EdgeLike] = originalEdgeOpt
@@ -68,43 +84,20 @@ trait S2EdgeLike extends Edge with GraphElement {
 
   def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(innerGraph, 
srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs)
 
-  def serializePropsWithTs(): Array[Byte] = 
HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => 
kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq)
-
   def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props =
-    S2Edge.updatePropsWithTs(this, others)
+    S2EdgePropertyHelper.updatePropsWithTs(this, others)
 
-  def propertyValue(key: String): Option[InnerValLikeWithTs] = 
S2Edge.propertyValue(this, key)
+  def propertyValue(key: String): Option[InnerValLikeWithTs] = 
S2EdgePropertyHelper.propertyValue(this, key)
 
-  def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs = {
-    //    propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse()
-    if (propsWithTs.containsKey(labelMeta.name)) {
-      propsWithTs.get(labelMeta.name).innerValWithTs
-    } else {
-      innerLabel.metaPropsDefaultMapInner(labelMeta)
-    }
-  }
+  def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs =
+    S2EdgePropertyHelper.propertyValueInner(this, labelMeta)
 
   def propertyValues(keys: Seq[String] = Nil): Map[LabelMeta, 
InnerValLikeWithTs] = {
-    val labelMetas = for {
-      key <- keys
-      labelMeta <- innerLabel.metaPropsInvMap.get(key)
-    } yield labelMeta
-
-    propertyValuesInner(labelMetas)
+    S2EdgePropertyHelper.propertyValuesInner(this, 
S2EdgePropertyHelper.toLabelMetas(this, keys))
   }
 
-  def propertyValuesInner(labelMetas: Seq[LabelMeta] = Nil): Map[LabelMeta, 
InnerValLikeWithTs] = {
-    if (labelMetas.isEmpty) {
-      innerLabel.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) =>
-        labelMeta -> propertyValueInner(labelMeta)
-      }
-    } else {
-      // This is important since timestamp is required for all edges.
-      (LabelMeta.timestamp +: labelMetas).map { labelMeta =>
-        labelMeta -> propertyValueInner(labelMeta)
-      }.toMap
-    }
-  }
+  def propertyValuesInner(labelMetas: Seq[LabelMeta] = Nil): Map[LabelMeta, 
InnerValLikeWithTs] =
+    S2EdgePropertyHelper.propertyValuesInner(this, labelMetas)
 
   def relatedEdges = builder.relatedEdges
 
@@ -114,7 +107,6 @@ trait S2EdgeLike extends Edge with GraphElement {
 
   def duplicateEdge = builder.duplicateEdge
 
-  //  def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled)
   def reverseDirEdge = builder.reverseDirEdge
 
   def reverseSrcTgtEdge = builder.reverseSrcTgtEdge
@@ -215,7 +207,8 @@ trait S2EdgeLike extends Edge with GraphElement {
     v
   }
 
-  def propertyInner[V](key: String, value: V, ts: Long): Property[V] = 
builder.propertyInner(key, value, ts)
+  def propertyInner[V](key: String, value: V, ts: Long): Property[V] =
+    S2EdgePropertyHelper.propertyInner(this, key, value, ts)
 
   def remove(): Unit = {
     if (graph.features().edge().supportsRemoveEdges()) {
@@ -231,14 +224,6 @@ trait S2EdgeLike extends Edge with GraphElement {
     }
   }
 
-  def graph(): Graph = innerGraph
-
-  lazy val edgeId: EdgeId = builder.edgeId
-
-  def id(): AnyRef = edgeId
-
-  def label(): String = innerLabel.label
-
   def toLogString: String = {
     //    val allPropsWithName = defaultPropsWithName ++ 
Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj())
     List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, 
innerLabel.label, propsWithTs).mkString("\t")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala
new file mode 100644
index 0000000..2d24a2e
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala
@@ -0,0 +1,70 @@
+package org.apache.s2graph.core
+
+import java.util.function.BiConsumer
+
+import org.apache.s2graph.core.S2Edge.Props
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs}
+import org.apache.tinkerpop.gremlin.structure.Property
+
+object S2EdgePropertyHelper {
+  def propertyInner[V](edge: S2EdgeLike, key: String, value: V, ts: Long): 
Property[V] = {
+    val labelMeta = edge.innerLabel.metaPropsInvMap.getOrElse(key, throw new 
RuntimeException(s"$key is not configured on Edge."))
+    val newProp = new S2Property[V](edge, labelMeta, key, value, ts)
+    edge.getPropsWithTs().put(key, newProp)
+    newProp
+  }
+  def updatePropsWithTs(edge: S2EdgeLike, others: Props = S2Edge.EmptyProps): 
Props = {
+    val emptyProp = S2Edge.EmptyProps
+
+    edge.getPropsWithTs().forEach(new BiConsumer[String, S2Property[_]] {
+      override def accept(key: String, value: S2Property[_]): Unit = 
emptyProp.put(key, value)
+    })
+
+    others.forEach(new BiConsumer[String, S2Property[_]] {
+      override def accept(key: String, value: S2Property[_]): Unit = 
emptyProp.put(key, value)
+    })
+
+    emptyProp
+  }
+
+  def propertyValue(e: S2EdgeLike, key: String): Option[InnerValLikeWithTs] = {
+    key match {
+      case "from" | "_from" => Option(InnerValLikeWithTs(e.srcVertex.innerId, 
e.ts))
+      case "to" | "_to" => Option(InnerValLikeWithTs(e.tgtVertex.innerId, 
e.ts))
+      case "label" => 
Option(InnerValLikeWithTs(InnerVal.withStr(e.innerLabel.label, 
e.innerLabel.schemaVersion), e.ts))
+      case "direction" => 
Option(InnerValLikeWithTs(InnerVal.withStr(e.getDirection(), 
e.innerLabel.schemaVersion), e.ts))
+      case _ =>
+        e.innerLabel.metaPropsInvMap.get(key).map(labelMeta => 
e.propertyValueInner(labelMeta))
+    }
+  }
+
+  def propertyValuesInner(edge: S2EdgeLike, labelMetas: Seq[LabelMeta] = Nil): 
Map[LabelMeta, InnerValLikeWithTs] = {
+    if (labelMetas.isEmpty) {
+      edge.innerLabel.metaPropsDefaultMapInner.map { case (labelMeta, 
defaultVal) =>
+        labelMeta -> edge.propertyValueInner(labelMeta)
+      }
+    } else {
+      // This is important since timestamp is required for all edges.
+      (LabelMeta.timestamp +: labelMetas).map { labelMeta =>
+        labelMeta -> propertyValueInner(edge, labelMeta)
+      }.toMap
+    }
+  }
+
+  def propertyValueInner(edge: S2EdgeLike, labelMeta: LabelMeta): 
InnerValLikeWithTs = {
+    //    propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse()
+    if (edge.getPropsWithTs().containsKey(labelMeta.name)) {
+      edge.getPropsWithTs().get(labelMeta.name).innerValWithTs
+    } else {
+      edge.innerLabel.metaPropsDefaultMapInner(labelMeta)
+    }
+  }
+
+  def toLabelMetas(edge: S2EdgeLike, keys: Seq[String]): Seq[LabelMeta] = {
+    for {
+      key <- keys
+      labelMeta <- edge.innerLabel.metaPropsInvMap.get(key)
+    } yield labelMeta
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index f061160..3270e84 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -663,13 +663,13 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends S2Grap
                 buildLastStepInnerResult: Boolean = false): Future[StepResult] 
= {
     if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty)
     else {
-      val (alreadyVisited: Map[(LabelWithDirection, S2VertexLike), Boolean], 
prevStepTgtVertexIdEdges: Map[VertexId, ArrayBuffer[EdgeWithScore]], 
queryRequests: Seq[QueryRequest]) =
+      val (_, prevStepTgtVertexIdEdges: Map[VertexId, 
ArrayBuffer[EdgeWithScore]], queryRequests: Seq[QueryRequest]) =
         traversalHelper.buildNextStepQueryRequests(orgQuery, stepIdx, 
stepInnerResult)
 
       val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges)
 
       traversalHelper.filterEdges(orgQuery, stepIdx, queryRequests,
-        fetchedLs, orgQuery.steps(stepIdx).queryParams, alreadyVisited, 
buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec)
+        fetchedLs, orgQuery.steps(stepIdx).queryParams, 
buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec)
     }
   }
 
@@ -883,7 +883,7 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends S2Grap
     val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case 
(edge, idx) => edge.getOp() == GraphUtil.operations("deleteAll") }
 
     val deleteAllFutures = strongDeleteAll.map { case (edge, idx) =>
-      deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.innerLabel), 
edge.labelWithDir.dir, edge.ts).map(idx -> _)
+      deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.innerLabel), 
edge.getDir(), edge.ts).map(idx -> _)
     }
 
     val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => 
edge.innerLabel }.map { case (label, edgeGroup) =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
index b88c18d..9ec2ab0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
@@ -73,7 +73,7 @@ trait S2VertexLike extends Vertex with GraphElement {
       override def accept(edge: Edge): Unit = {
         val s2Edge = edge.asInstanceOf[S2EdgeLike]
 
-        s2Edge.direction match {
+        s2Edge.getDirection() match {
           case "out" => arr.add(edge.inVertex())
           case "in" => arr.add(edge.outVertex())
           case _ => throw new IllegalStateException("only out/in direction can 
be found in S2Edge")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
index 58da145..25b909e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
@@ -43,12 +43,12 @@ object TraversalHelper {
     }
   }
 
-  def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): 
Map[(LabelWithDirection, S2VertexLike), Boolean] = {
+  def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(Int, 
Int, S2VertexLike), Boolean] = {
     val vertices = for {
       edgeWithScore <- edgeWithScoreLs
       edge = edgeWithScore.edge
-      vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) 
edge.tgtVertex else edge.srcVertex
-    } yield (edge.labelWithDir, vertex) -> true
+      vertex = if (edge.getDir() == GraphUtil.directions("out")) 
edge.tgtVertex else edge.srcVertex
+    } yield (edge.getLabelId(), edge.getDir(), vertex) -> true
 
     vertices.toMap
   }
@@ -114,7 +114,7 @@ object TraversalHelper {
   def toHashKey(queryParam: QueryParam, edge: S2EdgeLike, isDegree: Boolean): 
(HashKey, FilterHashKey) = {
     val src = edge.srcVertex.innerId.hashCode()
     val tgt = edge.tgtVertex.innerId.hashCode()
-    val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, 
isDegree)
+    val hashKey = (src, edge.getLabelId(), edge.getDir(), tgt, isDegree)
     val filterHashKey = (src, tgt)
 
     (hashKey, filterHashKey)
@@ -136,7 +136,7 @@ class TraversalHelper(graph: S2GraphLike) {
     val step = q.steps(stepIdx)
 
     val alreadyVisited =
-      if (stepIdx == 0) Map.empty[(LabelWithDirection, S2VertexLike), Boolean]
+      if (stepIdx == 0) Map.empty[(Int, Int, S2VertexLike), Boolean]
       else alreadyVisitedVertices(stepInnerResult.edgeWithScores)
 
     val initial = (Map.empty[S2VertexLike, Double], Map.empty[S2VertexLike, 
ArrayBuffer[EdgeWithScore]])
@@ -172,7 +172,6 @@ class TraversalHelper(graph: S2GraphLike) {
                   queryRequests: Seq[QueryRequest],
                   queryResultLsFuture: Future[Seq[StepResult]],
                   queryParams: Seq[QueryParam],
-                  alreadyVisited: Map[(LabelWithDirection, S2VertexLike), 
Boolean] = Map.empty,
                   buildLastStepInnerResult: Boolean = true,
                   parentEdges: Map[VertexId, Seq[EdgeWithScore]])
                  (implicit ec: scala.concurrent.ExecutionContext): 
Future[StepResult] = {
@@ -214,7 +213,6 @@ class TraversalHelper(graph: S2GraphLike) {
           val _results = buildResult(q, stepIdx, currentStepResults, 
parentEdges) { (edgeWithScore, propsSelectColumns) =>
             val edge = edgeWithScore.edge
             val score = edgeWithScore.score
-            val label = edgeWithScore.label
 
             /* Select */
             val mergedPropsWithTs = 
edge.propertyValuesInner(propsSelectColumns)
@@ -225,7 +223,7 @@ class TraversalHelper(graph: S2GraphLike) {
             val newEdgeWithScore = edgeWithScore.copy(edge = newEdge)
             /* OrderBy */
             val orderByValues =
-              if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, 
None, None)
+              if (queryOption.orderByKeys.isEmpty) (score, 
edge.getTsInnerValValue(), None, None)
               else 
StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys))
 
             /* StepGroupBy */
@@ -296,8 +294,6 @@ class TraversalHelper(graph: S2GraphLike) {
     val parents = if (shouldBuildParents) {
       parentEdges.getOrElse(queryRequest.vertex.id, Nil).map { edgeWithScore =>
         val edge = edgeWithScore.edge
-        val score = edgeWithScore.score
-        val label = edgeWithScore.label
 
         /* Select */
         val mergedPropsWithTs =

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
index a0d56b2..d947066 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
@@ -54,7 +54,7 @@ trait ExtractValue {
       val (propKey, _) = findParentEdge(edge, key)
 
       val labelMeta = label.metaPropsInvMap.getOrElse(propKey, throw 
WhereParserException(s"Where clause contains not existing property name: 
$propKey"))
-      val (srcColumn, tgtColumn) = label.srcTgtColumn(edge.labelWithDir.dir)
+      val (srcColumn, tgtColumn) = label.srcTgtColumn(edge.getDir())
       val dataType = propKey match {
         case "_to" | "to" => tgtColumn.columnType
         case "_from" | "from" => srcColumn.columnType

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
index 44bd4dc..79ca8aa 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
@@ -50,7 +50,7 @@ trait StorageReadable {
 
   def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): 
Future[(Option[S2EdgeLike], Option[SKeyValue])] = {
     val queryParam = QueryParam(labelName = edge.innerLabel.label,
-      direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
+      direction = GraphUtil.fromDirection(edge.getDir()),
       tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
       cacheTTLInMillis = -1)
     val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam))

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
index bdb6e99..5f54e47 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
@@ -237,7 +237,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
 
               serDe.indexEdgeDeserializer(schemaVer = 
HBaseType.DEFAULT_VERSION)
                 .fromKeyValues(Seq(kv), None)
-                .filter(e => distinctLabels(e.innerLabel) && e.direction == 
"out" && !e.isDegree)
+                .filter(e => distinctLabels(e.innerLabel) && e.getDirection() 
== "out" && !e.isDegree)
             }
           }
       }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
index 24775e7..12edf54 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.storage.serde.snapshotedge.tall
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.SnapshotEdge
+import org.apache.s2graph.core.{S2Edge, SnapshotEdge}
 import org.apache.s2graph.core.mysqls.LabelIndex
 import org.apache.s2graph.core.storage.serde._
 import org.apache.s2graph.core.storage.serde.StorageSerializable._
@@ -54,7 +54,7 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) 
extends Serializable[
       case Some(pendingEdge) =>
         val opBytes = statusCodeWithOp(pendingEdge.statusCode, 
pendingEdge.getOp())
         val versionBytes = Array.empty[Byte]
-        val propsBytes = pendingEdge.serializePropsWithTs()
+        val propsBytes = S2Edge.serializePropsWithTs(pendingEdge)
         val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
 
         Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), 
Bytes.add(propsBytes, lockBytes))

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
index 44f2596..02a72b1 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.storage.serde.snapshotedge.wide
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.SnapshotEdge
+import org.apache.s2graph.core.{S2Edge, SnapshotEdge}
 import org.apache.s2graph.core.mysqls.LabelIndex
 import org.apache.s2graph.core.storage.serde.Serializable
 import org.apache.s2graph.core.storage.serde.StorageSerializable._
@@ -61,7 +61,7 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) 
extends Serializable[
       case Some(pendingEdge) =>
         val opBytes = statusCodeWithOp(pendingEdge.statusCode, 
pendingEdge.getOp())
         val versionBytes = Array.empty[Byte]
-        val propsBytes = pendingEdge.serializePropsWithTs()
+        val propsBytes = S2Edge.serializePropsWithTs(pendingEdge)
         val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
         Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), 
Bytes.add(propsBytes, lockBytes))
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index 2b31fcd..69878f8 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -95,7 +95,7 @@ object EdgeController extends Controller {
       results.get.zip(elements).map {
         case (r: MutateResponse, (e: S2EdgeLike, tsv: String)) if !r.isSuccess 
=>
           val kafkaMessages = if(e.getOp() == 
GraphUtil.operations("deleteAll")){
-            toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.innerLabel), 
e.labelWithDir.dir, e.ts)
+            toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.innerLabel), 
e.getDir(), e.ts)
           } else{
             
Seq(ExceptionHandler.toKafkaMessage(Config.KAFKA_MUTATE_FAIL_TOPIC, e, 
Some(tsv)))
           }

Reply via email to