bug fix on S2EdgeDataFrameWriter.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/31b51929
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/31b51929
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/31b51929

Branch: refs/heads/master
Commit: 31b51929c4073d761bb03541001e07216cf8faa8
Parents: a0ce6f8
Author: DO YUNG YOON <[email protected]>
Authored: Thu Apr 5 19:00:23 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Thu Apr 5 19:00:53 2018 +0900

----------------------------------------------------------------------
 .../s2graph/s2jobs/loader/HFileGenerator.scala  |  2 +-
 .../s2jobs/serde/GraphElementReadable.scala     |  2 +-
 .../s2jobs/serde/reader/IdentityReader.scala    |  4 +-
 .../serde/reader/RowBulkFormatReader.scala      |  4 +-
 .../s2jobs/serde/reader/S2GraphCellReader.scala | 64 +++++---------
 .../serde/reader/TsvBulkFormatReader.scala      |  4 +-
 .../writer/GraphElementDataFrameWriter.scala    | 49 -----------
 .../serde/writer/S2EdgeDataFrameWriter.scala    | 50 +++++++++++
 .../serde/writer/S2VertexDataFrameWriter.scala  | 51 ++++++++++++
 .../org/apache/s2graph/s2jobs/task/Source.scala | 24 ++++--
 .../apache/s2graph/s2jobs/task/SourceTest.scala | 88 ++++++++++++++++++--
 11 files changed, 227 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index c105448..a0c14e0 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -37,7 +37,7 @@ import org.apache.s2graph.core.GraphElement
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
 import org.apache.s2graph.s2jobs.S2GraphHelper
 import org.apache.s2graph.s2jobs.serde.reader.{S2GraphCellReader, 
TsvBulkFormatReader}
-import org.apache.s2graph.s2jobs.serde.writer.{GraphElementDataFrameWriter, 
IdentityWriter, KeyValueWriter}
+import org.apache.s2graph.s2jobs.serde.writer.{S2EdgeDataFrameWriter, 
IdentityWriter, KeyValueWriter}
 import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, 
KeyFamilyQualifier}
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementReadable.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementReadable.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementReadable.scala
index 0544a84..148dc66 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementReadable.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementReadable.scala
@@ -22,5 +22,5 @@ package org.apache.s2graph.s2jobs.serde
 import org.apache.s2graph.core.{GraphElement, S2Graph}
 
 trait GraphElementReadable[S] extends Serializable {
-  def read(graph: S2Graph)(data: S): Option[GraphElement]
+  def read(graph: S2Graph)(data: S): Seq[GraphElement]
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala
index a4d985b..8652170 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala
@@ -23,6 +23,6 @@ import org.apache.s2graph.core.{GraphElement, S2Graph}
 import org.apache.s2graph.s2jobs.serde.GraphElementReadable
 
 class IdentityReader extends GraphElementReadable[GraphElement] {
-  override def read(graph: S2Graph)(data: GraphElement): Option[GraphElement] =
-    Option(data)
+  override def read(graph: S2Graph)(data: GraphElement): Seq[GraphElement] =
+    Seq(data)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
index 12b2ba2..de8a365 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.Row
 class RowBulkFormatReader extends GraphElementReadable[Row] {
   private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", 
"operation", "elem", "direction")
 
-  override def read(s2: S2Graph)(row: Row): Option[GraphElement] =
-    S2GraphHelper.sparkSqlRowToGraphElement(s2, row, row.schema, 
RESERVED_COLUMN)
+  override def read(s2: S2Graph)(row: Row): Seq[GraphElement] =
+    S2GraphHelper.sparkSqlRowToGraphElement(s2, row, row.schema, 
RESERVED_COLUMN).toSeq
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala
index f5487ab..454294e 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala
@@ -27,54 +27,28 @@ import org.apache.s2graph.core.{GraphElement, S2Graph}
 import org.apache.s2graph.s2jobs.serde.GraphElementReadable
 
 class S2GraphCellReader(elementType: String) extends 
GraphElementReadable[Seq[Cell]]{
-  override def read(s2: S2Graph)(cells: Seq[Cell]): Option[GraphElement] = {
-    if (cells.isEmpty) None
-    else {
-      //TODO:
-      val cell = cells.head
-      val schemaVer = HBaseType.DEFAULT_VERSION
-      val cf = cell.getFamily
-
-      val kvs = cells.map { cell =>
-        new SKeyValue(Array.empty[Byte], cell.getRow, cell.getFamily, 
cell.getQualifier,
-          cell.getValue, cell.getTimestamp, SKeyValue.Default)
-      }
-      elementType match {
-        case "IndexEdge" =>
-          if (!Bytes.equals(cf, SKeyValue.EdgeCf))
-            throw new IllegalArgumentException(s"$elementType is provided by 
user, but actual column family differ as e")
+  override def read(s2: S2Graph)(cells: Seq[Cell]): Seq[GraphElement] = {
+    val schemaVer = HBaseType.DEFAULT_VERSION
+    val kvs = cells.map { cell =>
+      new SKeyValue(Array.empty[Byte], cell.getRow, cell.getFamily, 
cell.getQualifier,
+        cell.getValue, cell.getTimestamp, SKeyValue.Default)
+    }
 
+    elementType.toLowerCase match {
+      case "vertex" | "v" =>
+        s2.defaultStorage.serDe.vertexDeserializer(schemaVer)
+          .fromKeyValues(kvs, None).map(_.asInstanceOf[GraphElement]).toSeq
+      case "indexedge" | "ie" =>
+        kvs.flatMap { kv =>
           s2.defaultStorage.serDe.indexEdgeDeserializer(schemaVer)
-            .fromKeyValues(kvs, None).map(_.asInstanceOf[GraphElement])
-        case "SnapshotEdge" =>
-          //TODO: replace this to use separate column family: 
SKeyValue.SnapshotEdgeCF
-          if (!Bytes.equals(cf, SKeyValue.EdgeCf))
-            throw new IllegalArgumentException(s"$elementType is provided by 
user, but actual column family differ as e")
-
+            .fromKeyValues(Seq(kv), None).map(_.asInstanceOf[GraphElement])
+        }
+      case "snapshotedge" | "se" =>
+        kvs.flatMap { kv =>
           s2.defaultStorage.serDe.snapshotEdgeDeserializer(schemaVer)
-            .fromKeyValues(kvs, None).map(_.toEdge.asInstanceOf[GraphElement])
-        case "Vertex" =>
-          if (!Bytes.equals(cf, SKeyValue.VertexCf))
-            throw new IllegalArgumentException(s"$elementType is provided by 
user, but actual column family differ as v")
-
-          s2.defaultStorage.serDe.vertexDeserializer(schemaVer)
-            .fromKeyValues(kvs, None).map(_.asInstanceOf[GraphElement])
-        case _ => throw new IllegalArgumentException(s"$elementType is not 
supported column family.")
-      }
-//      if (Bytes.equals(cf, SKeyValue.VertexCf)) {
-//        
s2.defaultStorage.serDe.vertexDeserializer(schemaVer).fromKeyValues(kvs, 
None).map(_.asInstanceOf[GraphElement])
-//      } else if (Bytes.equals(cf, SKeyValue.EdgeCf)) {
-//        val indexEdgeOpt = 
s2.defaultStorage.serDe.indexEdgeDeserializer(schemaVer).fromKeyValues(kvs, 
None)
-//        if (indexEdgeOpt.isDefined) 
indexEdgeOpt.map(_.asInstanceOf[GraphElement])
-//        else {
-//          //TODO: Current version use same column family for snapshotEdge 
and indexEdge.
-//
-//          val snapshotEdgeOpt = 
s2.defaultStorage.serDe.snapshotEdgeDeserializer(schemaVer).fromKeyValues(kvs, 
None)
-////          if (snapshotEdgeOpt.isDefined) 
snapshotEdgeOpt.map(_.toEdge.asInstanceOf[GraphElement])
-////          else throw new IllegalStateException(s"column family indicate 
this is edge, but neither snapshot/index edge.")
-//          None
-//        }
-//      } else throw new IllegalStateException(s"wrong column family. 
${Bytes.toString(cf)}")
+            .fromKeyValues(Seq(kv), None).map(_.asInstanceOf[GraphElement])
+        }
+      case _ => throw new IllegalArgumentException(s"$elementType is not 
supported.")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/TsvBulkFormatReader.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/TsvBulkFormatReader.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/TsvBulkFormatReader.scala
index 5465517..963a7d8 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/TsvBulkFormatReader.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/TsvBulkFormatReader.scala
@@ -23,7 +23,7 @@ import org.apache.s2graph.core.{GraphElement, S2Graph}
 import org.apache.s2graph.s2jobs.serde.GraphElementReadable
 
 class TsvBulkFormatReader extends GraphElementReadable[String] {
-  override def read(graph: S2Graph)(data: String): Option[GraphElement] = {
-    graph.elementBuilder.toGraphElement(data)
+  override def read(graph: S2Graph)(data: String): Seq[GraphElement] = {
+    graph.elementBuilder.toGraphElement(data).toSeq
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/GraphElementDataFrameWriter.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/GraphElementDataFrameWriter.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/GraphElementDataFrameWriter.scala
deleted file mode 100644
index b6cbbb3..0000000
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/GraphElementDataFrameWriter.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.s2jobs.serde.writer
-
-import org.apache.s2graph.core.{GraphElement, S2Graph}
-import org.apache.s2graph.s2jobs.DegreeKey
-import org.apache.s2graph.s2jobs.serde.GraphElementWritable
-
-object GraphElementDataFrameWriter {
-  type GraphElementTuple = (Long, String, String, String, String, String, 
String, String)
-  val Fields = Seq("timestamp", "operation", "element", "from", "to", "label", 
"props", "direction")
-}
-
-class GraphElementDataFrameWriter extends 
GraphElementWritable[GraphElementDataFrameWriter.GraphElementTuple] {
-  import GraphElementDataFrameWriter._
-  private def toGraphElementTuple(tokens: Array[String]): GraphElementTuple = {
-    tokens match {
-      case Array(ts, op, elem, from, to, label, props, dir) => (ts.toLong, op, 
elem, from, to, label, props, dir)
-      case Array(ts, op, elem, from, to, label, props) => (ts.toLong, op, 
elem, from, to, label, props, "out")
-      case _ => throw new IllegalStateException(s"${tokens.toList} is 
malformed.")
-    }
-  }
-  override def write(s2: S2Graph)(element: GraphElement): GraphElementTuple = {
-    toGraphElementTuple(element.toLogString().split("\t"))
-  }
-
-  override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): 
GraphElementTuple = {
-    val element = DegreeKey.toEdge(s2, degreeKey, count)
-
-    toGraphElementTuple(element.toLogString().split("\t"))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2EdgeDataFrameWriter.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2EdgeDataFrameWriter.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2EdgeDataFrameWriter.scala
new file mode 100644
index 0000000..c2c305c
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2EdgeDataFrameWriter.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.serde.writer
+
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.DegreeKey
+import org.apache.s2graph.s2jobs.serde.GraphElementWritable
+import org.apache.s2graph.s2jobs.serde.writer.S2EdgeDataFrameWriter.S2EdgeTuple
+
+object S2EdgeDataFrameWriter {
+  type S2EdgeTuple = (Long, String, String, String, String, String, String, 
String)
+  val Fields = Seq("timestamp", "operation", "elem", "from", "to", "label", 
"props", "direction")
+}
+
+class S2EdgeDataFrameWriter extends GraphElementWritable[S2EdgeTuple] {
+  import S2EdgeDataFrameWriter._
+  private def toGraphElementTuple(tokens: Array[String]): S2EdgeTuple = {
+    tokens match {
+      case Array(ts, op, elem, from, to, label, props, dir) => (ts.toLong, op, 
elem, from, to, label, props, dir)
+      case Array(ts, op, elem, from, to, label, props) => (ts.toLong, op, 
elem, from, to, label, props, "out")
+      case _ => throw new IllegalStateException(s"${tokens.toList} is 
malformed.")
+    }
+  }
+  override def write(s2: S2Graph)(element: GraphElement): S2EdgeTuple = {
+    toGraphElementTuple(element.toLogString().split("\t"))
+  }
+
+  override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): 
S2EdgeTuple = {
+    val element = DegreeKey.toEdge(s2, degreeKey, count)
+
+    toGraphElementTuple(element.toLogString().split("\t"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2VertexDataFrameWriter.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2VertexDataFrameWriter.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2VertexDataFrameWriter.scala
new file mode 100644
index 0000000..c37f78e
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2VertexDataFrameWriter.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.serde.writer
+
+import org.apache.s2graph.core.{GraphElement, S2Graph, S2VertexLike}
+import org.apache.s2graph.s2jobs.DegreeKey
+import org.apache.s2graph.s2jobs.serde.GraphElementWritable
+import 
org.apache.s2graph.s2jobs.serde.writer.S2VertexDataFrameWriter.S2VertexTuple
+
+object S2VertexDataFrameWriter {
+  type S2VertexTuple = (Long, String, String, String, String, String, String)
+  val EmptyS2VertexTuple = (0L, "", "", "", "", "", "")
+  val Fields = Seq("timestamp", "operation", "elem", "id", "service", 
"column", "props")
+}
+
+class S2VertexDataFrameWriter extends GraphElementWritable[S2VertexTuple] {
+  import S2VertexDataFrameWriter._
+  private def toVertexTuple(tokens: Array[String]): S2VertexTuple = {
+    tokens match {
+      case Array(ts, op, elem, id, service, column, props) => (ts.toLong, op, 
elem, id, service, column, props)
+      case _ => throw new IllegalStateException(s"${tokens.toList} is 
malformed.")
+    }
+  }
+  override def write(s2: S2Graph)(element: GraphElement): S2VertexTuple = {
+    element match {
+      case v: S2VertexLike => toVertexTuple(v.toLogString().split("\t"))
+      case _ => throw new IllegalArgumentException(s"Vertex expected, $element 
is not vertex.")
+    }
+
+  }
+
+  override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): 
S2VertexTuple =
+    EmptyS2VertexTuple
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
index 7b52415..06a28c8 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.s2jobs.task
 import org.apache.s2graph.core.{GraphElement, Management}
 import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, 
SparkBulkLoaderTransformer}
 import org.apache.s2graph.s2jobs.serde.reader.S2GraphCellReader
-import org.apache.s2graph.s2jobs.serde.writer.GraphElementDataFrameWriter
+import org.apache.s2graph.s2jobs.serde.writer.{S2EdgeDataFrameWriter, 
S2VertexDataFrameWriter}
 import org.apache.spark.sql.{DataFrame, DataFrameWriter, SparkSession}
 
 
@@ -122,7 +122,9 @@ class S2GraphSource(conf: TaskConf) extends Source(conf) {
     val columnFamily = conf.options.getOrElse("hbase.table.cf", "e")
     val batchSize = conf.options.getOrElse("scan.batch.size", "1000").toInt
     val labelMapping = Map.empty[String, String]
-    val buildDegree = conf.options.getOrElse("build.degree", "false").toBoolean
+    val buildDegree =
+      if (columnFamily == "v") false
+      else conf.options.getOrElse("build.degree", "false").toBoolean
     val elementType = conf.options.getOrElse("element.type", "IndexEdge")
 
     val cells = HFileGenerator.tableSnapshotDump(ss, config, snapshotPath,
@@ -130,11 +132,21 @@ class S2GraphSource(conf: TaskConf) extends Source(conf) {
 
 
     implicit val reader = new S2GraphCellReader(elementType)
-    implicit val writer = new GraphElementDataFrameWriter
 
-    val transformer = new SparkBulkLoaderTransformer(config, labelMapping, 
buildDegree)
-    val kvs = transformer.transform(cells)
+    columnFamily match {
+      case "v" =>
+        implicit val writer = new S2VertexDataFrameWriter
+        val transformer = new SparkBulkLoaderTransformer(config, labelMapping, 
buildDegree)
+        val kvs = transformer.transform(cells)
 
-    kvs.toDF(GraphElementDataFrameWriter.Fields: _*)
+        kvs.toDF(S2VertexDataFrameWriter.Fields: _*)
+      case "e" =>
+        implicit val writer = new S2EdgeDataFrameWriter
+        val transformer = new SparkBulkLoaderTransformer(config, labelMapping, 
buildDegree)
+        val kvs = transformer.transform(cells)
+
+        kvs.toDF(S2EdgeDataFrameWriter.Fields: _*)
+      case _ => throw new IllegalArgumentException(s"$columnFamily is not 
supported.")
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
index 6e712a6..9cd52eb 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
@@ -20,16 +20,28 @@
 package org.apache.s2graph.s2jobs.task
 
 import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
-import org.apache.s2graph.core.S2EdgeLike
+import org.apache.s2graph.core.{GraphUtil, S2EdgeLike, S2VertexLike}
 import org.apache.s2graph.core.storage.hbase.{AsynchbaseStorage, 
AsynchbaseStorageManagement}
 import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.{S2EdgeDataFrameWriter, 
S2VertexDataFrameWriter}
 import org.apache.s2graph.s2jobs.{BaseSparkTest, S2GraphHelper}
 import org.apache.spark.sql.DataFrame
 
 import scala.collection.JavaConverters._
 
 class SourceTest extends BaseSparkTest {
-  def toDataFrame(edges: Seq[String]): DataFrame = {
+  //TODO: props to valid string.
+  def s2VertexToDataFrame(vertices: Seq[String]): DataFrame = {
+    import spark.sqlContext.implicits._
+    val elements = vertices.flatMap(s2.elementBuilder.toVertex(_))
+
+    elements.map { v =>
+      (v.ts, GraphUtil.fromOp(v.op),
+        "v", v.innerId.toIdString(), v.serviceName, v.columnName, "{}")
+    }.toDF(S2VertexDataFrameWriter.Fields: _*)
+  }
+
+  def s2EdgeToDataFrame(edges: Seq[String]): DataFrame = {
     import spark.sqlContext.implicits._
     val elements = edges.flatMap(s2.elementBuilder.toEdge(_))
 
@@ -42,21 +54,24 @@ class SourceTest extends BaseSparkTest {
         e.label(),
         "{}",
         e.getDirection())
-    }.toDF("timestamp", "operation", "element", "from", "to", "label", 
"props", "direction")
+    }.toDF(S2EdgeDataFrameWriter.Fields: _*)
   }
 
-
-  test("S2GraphSource toDF") {
+  test("S2GraphSource edge toDF") {
     val label = initTestEdgeSchema(s2, tableName, schemaVersion, 
compressionAlgorithm)
     val snapshotTableName = options.tableName + "-snapshot"
 
     // 1. run S2GraphSink to build(not actually load by using 
LoadIncrementalLoad) bulk load file.
-    val bulkEdgeString = 
"1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
-    val df = toDataFrame(Seq(bulkEdgeString))
+    val bulkEdges = Seq(
+      
"1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}",
+      
"1416236400000\tinsert\tedge\ta\tc\tfriends\t{\"since\":1316236400000,\"score\":10}"
+    )
+    val df = s2EdgeToDataFrame(bulkEdges)
 
     val reader = new RowBulkFormatReader
 
     val inputEdges = df.collect().flatMap(reader.read(s2)(_))
+      .sortBy(_.asInstanceOf[S2EdgeLike].tgtVertex.innerId.toIdString())
 
     val args = options.toCommand.grouped(2).map { kv =>
       kv.head -> kv.last
@@ -92,10 +107,69 @@ class SourceTest extends BaseSparkTest {
     val source = new S2GraphSource(dumpConf)
     val realDF = source.toDF(spark)
     val outputEdges = realDF.collect().flatMap(reader.read(s2)(_))
+      .sortBy(_.asInstanceOf[S2EdgeLike].tgtVertex.innerId.toIdString())
 
     inputEdges.foreach { e => println(s"[Input]: $e")}
     outputEdges.foreach { e => println(s"[Output]: $e")}
 
     inputEdges shouldBe outputEdges
   }
+
+  test("S2GraphSource vertex toDF") {
+    val column = initTestVertexSchema(s2)
+    val snapshotTableName = options.tableName + "-snapshot"
+
+    val bulkVertices = Seq(
+      
s"1416236400000\tinsert\tvertex\tc\t${column.service.serviceName}\t${column.columnName}\t{}",
+      
s"1416236400000\tinsert\tvertex\td\t${column.service.serviceName}\t${column.columnName}\t{}"
+    )
+    val df = s2VertexToDataFrame(bulkVertices)
+
+    val reader = new RowBulkFormatReader
+
+    val input = df.collect().flatMap(reader.read(s2)(_))
+      .sortBy(_.asInstanceOf[S2VertexLike].innerId.toIdString())
+
+    val args = options.toCommand.grouped(2).map { kv =>
+      kv.head -> kv.last
+    }.toMap ++ Map("writeMethod" -> "bulk", "runLoadIncrementalHFiles" -> 
"true")
+
+    val conf = TaskConf("test", "sql", Seq("input"), args)
+
+    val sink = new S2GraphSink("testQuery", conf)
+    sink.write(df)
+
+    // 2. create snapshot if snapshot is not exist to test 
TableSnapshotInputFormat.
+    
s2.defaultStorage.management.asInstanceOf[AsynchbaseStorageManagement].withAdmin(s2.config)
 { admin =>
+      import scala.collection.JavaConverters._
+      if 
(admin.listSnapshots(snapshotTableName).asScala.toSet(snapshotTableName))
+        admin.deleteSnapshot(snapshotTableName)
+
+      admin.snapshot(snapshotTableName, TableName.valueOf(options.tableName))
+    }
+
+    // 3. Decode S2GraphSource to parse HFile
+    val metaAndHBaseArgs = options.toConfigParams
+    val hbaseConfig = 
HBaseConfiguration.create(spark.sparkContext.hadoopConfiguration)
+
+    val dumpArgs = Map(
+      "hbase.rootdir" -> hbaseConfig.get("hbase.rootdir"),
+      "restore.path" -> "/tmp/hbase_restore",
+      "hbase.table.names" -> s"${snapshotTableName}",
+      "hbase.table.cf" -> "v",
+      "element.type" -> "Vertex"
+    ) ++ metaAndHBaseArgs
+
+    val dumpConf = TaskConf("dump", "sql", Seq("input"), dumpArgs)
+    val source = new S2GraphSource(dumpConf)
+    val realDF = source.toDF(spark)
+
+    val output = realDF.collect().flatMap(reader.read(s2)(_))
+      .sortBy(_.asInstanceOf[S2VertexLike].innerId.toIdString())
+
+    input.foreach { e => println(s"[Input]: $e")}
+    output.foreach { e => println(s"[Output]: $e")}
+
+    input shouldBe output
+  }
 }

Reply via email to