add tc for S2GraphSource.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/a0ce6f8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/a0ce6f8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/a0ce6f8e

Branch: refs/heads/master
Commit: a0ce6f8e578d33151651b70593c75d1c99c57cf3
Parents: 30137d4
Author: DO YUNG YOON <[email protected]>
Authored: Wed Apr 4 17:47:36 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Wed Apr 4 17:47:36 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/core/S2EdgeLike.scala    |  7 +-
 .../org/apache/s2graph/core/S2VertexLike.scala  |  5 +-
 .../s2graph/s2jobs/loader/HFileGenerator.scala  | 23 +++----
 .../loader/SparkBulkLoaderTransformer.scala     | 14 +++-
 .../s2graph/s2jobs/serde/Transformer.scala      |  1 -
 .../s2jobs/serde/reader/IdentityReader.scala    | 19 ++++++
 .../s2jobs/serde/reader/S2GraphCellReader.scala | 67 ++++++++++++++++----
 .../s2jobs/serde/writer/DataFrameWriter.scala   | 25 --------
 .../writer/GraphElementDataFrameWriter.scala    | 49 ++++++++++++++
 .../s2jobs/serde/writer/IdentityWriter.scala    | 19 ++++++
 .../org/apache/s2graph/s2jobs/task/Source.scala | 29 +++++++--
 .../org/apache/s2graph/s2jobs/task/Task.scala   |  9 +++
 .../s2graph/s2jobs/S2GraphHelperTest.scala      |  2 +-
 .../s2jobs/loader/GraphFileGeneratorTest.scala  |  2 +-
 .../apache/s2graph/s2jobs/task/SourceTest.scala | 52 ++++++++++++---
 15 files changed, 245 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
index f2ea4ad..f581e52 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
@@ -250,6 +250,11 @@ trait S2EdgeLike extends Edge with GraphElement {
 
   def toLogString: String = {
     //    val allPropsWithName = defaultPropsWithName ++ 
Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj())
-    List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, 
innerLabel.label, propsWithTs).mkString("\t")
+    val propsWithName = for {
+      (k, v) <- propsWithTs.asScala.toMap
+      jsValue <- JSONParser.anyValToJsValue(v.innerVal.value)
+    } yield (v.labelMeta.name -> jsValue)
+
+    List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, 
innerLabel.label, Json.toJson(propsWithName)).mkString("\t")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
index 2fbc4f1..eb01da3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
@@ -61,8 +61,9 @@ trait S2VertexLike extends Vertex with GraphElement {
 
   def toLogString(): String = {
     val propsWithName = for {
-      (k, v) <- props.asScala
-    } yield (v.columnMeta.name -> v.value.toString)
+      (k, v) <- props.asScala.toMap
+      jsValue <- JSONParser.anyValToJsValue(v.innerVal.value)
+    } yield (v.columnMeta.name -> jsValue)
 
     val (serviceName, columnName) =
       if (!id.storeColId) ("", "")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index ee6b842..c105448 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -33,11 +33,11 @@ import org.apache.hadoop.hbase._
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.util.ToolRunner
-import org.apache.s2graph.core.Management
+import org.apache.s2graph.core.GraphElement
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
 import org.apache.s2graph.s2jobs.S2GraphHelper
 import org.apache.s2graph.s2jobs.serde.reader.{S2GraphCellReader, 
TsvBulkFormatReader}
-import org.apache.s2graph.s2jobs.serde.writer.{DataFrameWriter, KeyValueWriter}
+import org.apache.s2graph.s2jobs.serde.writer.{GraphElementDataFrameWriter, 
IdentityWriter, KeyValueWriter}
 import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, 
KeyFamilyQualifier}
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
@@ -141,24 +141,25 @@ object HFileGenerator extends RawFileGenerator[String, 
KeyValue] {
 
   def tableSnapshotDump(ss: SparkSession,
            config: Config,
-           options: GraphFileOptions,
            snapshotPath: String,
            restorePath: String,
            tableNames: Seq[String],
            columnFamily: String = "e",
-           batchSize: Int = 1000): DataFrame = {
-    import ss.sqlContext.implicits._
-
+           elementType: String = "IndexEdge",
+           batchSize: Int = 1000,
+           labelMapping: Map[String, String] = Map.empty,
+           buildDegree: Boolean = false): RDD[Seq[Cell]] = {
     val cf = Bytes.toBytes(columnFamily)
 
     val hbaseConfig = 
HBaseConfiguration.create(ss.sparkContext.hadoopConfiguration)
     hbaseConfig.set("hbase.rootdir", snapshotPath)
 
     val initial = ss.sparkContext.parallelize(Seq.empty[Seq[Cell]])
-    val input = tableNames.foldLeft(initial) { case (prev, tableName) =>
+    tableNames.foldLeft(initial) { case (prev, tableName) =>
       val scan = new Scan
       scan.addFamily(cf)
       scan.setBatch(batchSize)
+      scan.setMaxVersions(1)
       TableSnapshotInputFormatImpl.setInput(hbaseConfig, tableName, new 
Path(restorePath))
       hbaseConfig.set(TableInputFormat.SCAN, 
Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()))
 
@@ -170,14 +171,6 @@ object HFileGenerator extends RawFileGenerator[String, 
KeyValue] {
 
       prev ++ current
     }
-
-    implicit val reader = new S2GraphCellReader
-    implicit val writer = new DataFrameWriter
-
-    val transformer = new SparkBulkLoaderTransformer(config, options)
-    val kvs = transformer.transform(input)
-
-    kvs.toDF(DataFrameWriter.Fields: _*)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
index 03d9784..5f8d3e5 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
@@ -28,14 +28,22 @@ import org.apache.spark.rdd.RDD
 import scala.reflect.ClassTag
 
 class SparkBulkLoaderTransformer(val config: Config,
-                                 val options: GraphFileOptions) extends 
Transformer[RDD] {
+                                 val labelMapping: Map[String, String] = 
Map.empty,
+                                 val buildDegree: Boolean = false) extends 
Transformer[RDD] {
+
+  val GraphElementEncoder = org.apache.spark.sql.Encoders.kryo[GraphElement]
+
+  implicit val encoder = GraphElementEncoder
+
+  def this(config: Config, options: GraphFileOptions) =
+    this(config, options.labelMapping, options.buildDegree)
 
   override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit 
writer: GraphElementWritable[T]): RDD[T] = {
     val degrees = elements.mapPartitions { iter =>
       val s2 = S2GraphHelper.initS2Graph(config)
 
       iter.flatMap { element =>
-        DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 
1L)
+        DegreeKey.fromGraphElement(s2, element).map(_ -> 1L)
       }
     }.reduceByKey(_ + _)
 
@@ -63,7 +71,7 @@ class SparkBulkLoaderTransformer(val config: Config,
       iter.map(writer.write(s2)(_))
     }
 
-    if (options.buildDegree) kvs ++ buildDegrees(elements)
+    if (buildDegree) kvs ++ buildDegrees(elements)
     else kvs
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
index a448d3f..0b6dcba 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
@@ -34,7 +34,6 @@ import scala.reflect.ClassTag
   */
 trait Transformer[M[_]] extends Serializable {
   val config: Config
-//  val options: GraphFileOptions
 
   def buildDegrees[T: ClassTag](elements: M[GraphElement])(implicit writer: 
GraphElementWritable[T]): M[T]
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala
index b4d1eb2..a4d985b 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.s2graph.s2jobs.serde.reader
 
 import org.apache.s2graph.core.{GraphElement, S2Graph}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala
index 868f0f2..f5487ab 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.s2graph.s2jobs.serde.reader
 
 import org.apache.hadoop.hbase.Cell
@@ -7,7 +26,7 @@ import org.apache.s2graph.core.types.HBaseType
 import org.apache.s2graph.core.{GraphElement, S2Graph}
 import org.apache.s2graph.s2jobs.serde.GraphElementReadable
 
-class S2GraphCellReader extends GraphElementReadable[Seq[Cell]]{
+class S2GraphCellReader(elementType: String) extends 
GraphElementReadable[Seq[Cell]]{
   override def read(s2: S2Graph)(cells: Seq[Cell]): Option[GraphElement] = {
     if (cells.isEmpty) None
     else {
@@ -20,18 +39,42 @@ class S2GraphCellReader extends 
GraphElementReadable[Seq[Cell]]{
         new SKeyValue(Array.empty[Byte], cell.getRow, cell.getFamily, 
cell.getQualifier,
           cell.getValue, cell.getTimestamp, SKeyValue.Default)
       }
+      elementType match {
+        case "IndexEdge" =>
+          if (!Bytes.equals(cf, SKeyValue.EdgeCf))
+            throw new IllegalArgumentException(s"$elementType is provided by 
user, but actual column family differ as e")
+
+          s2.defaultStorage.serDe.indexEdgeDeserializer(schemaVer)
+            .fromKeyValues(kvs, None).map(_.asInstanceOf[GraphElement])
+        case "SnapshotEdge" =>
+          //TODO: replace this to use separate column family: 
SKeyValue.SnapshotEdgeCF
+          if (!Bytes.equals(cf, SKeyValue.EdgeCf))
+            throw new IllegalArgumentException(s"$elementType is provided by 
user, but actual column family differ as e")
 
-      if (Bytes.equals(cf, SKeyValue.VertexCf)) {
-        
s2.defaultStorage.serDe.vertexDeserializer(schemaVer).fromKeyValues(kvs, 
None).map(_.asInstanceOf[GraphElement])
-      } else if (Bytes.equals(cf, SKeyValue.EdgeCf)) {
-        val indexEdgeOpt = 
s2.defaultStorage.serDe.indexEdgeDeserializer(schemaVer).fromKeyValues(kvs, 
None)
-        if (indexEdgeOpt.isDefined) 
indexEdgeOpt.map(_.asInstanceOf[GraphElement])
-        else {
-          val snapshotEdgeOpt = 
s2.defaultStorage.serDe.snapshotEdgeDeserializer(schemaVer).fromKeyValues(kvs, 
None)
-          if (snapshotEdgeOpt.isDefined) 
snapshotEdgeOpt.map(_.toEdge.asInstanceOf[GraphElement])
-          else throw new IllegalStateException(s"column family indicate this 
is edge, but neither snapshot/index edge.")
-        }
-      } else throw new IllegalStateException(s"wrong column family. 
${Bytes.toString(cf)}")
+          s2.defaultStorage.serDe.snapshotEdgeDeserializer(schemaVer)
+            .fromKeyValues(kvs, None).map(_.toEdge.asInstanceOf[GraphElement])
+        case "Vertex" =>
+          if (!Bytes.equals(cf, SKeyValue.VertexCf))
+            throw new IllegalArgumentException(s"$elementType is provided by 
user, but actual column family differ as v")
+
+          s2.defaultStorage.serDe.vertexDeserializer(schemaVer)
+            .fromKeyValues(kvs, None).map(_.asInstanceOf[GraphElement])
+        case _ => throw new IllegalArgumentException(s"$elementType is not 
supported column family.")
+      }
+//      if (Bytes.equals(cf, SKeyValue.VertexCf)) {
+//        
s2.defaultStorage.serDe.vertexDeserializer(schemaVer).fromKeyValues(kvs, 
None).map(_.asInstanceOf[GraphElement])
+//      } else if (Bytes.equals(cf, SKeyValue.EdgeCf)) {
+//        val indexEdgeOpt = 
s2.defaultStorage.serDe.indexEdgeDeserializer(schemaVer).fromKeyValues(kvs, 
None)
+//        if (indexEdgeOpt.isDefined) 
indexEdgeOpt.map(_.asInstanceOf[GraphElement])
+//        else {
+//          //TODO: Current version use same column family for snapshotEdge 
and indexEdge.
+//
+//          val snapshotEdgeOpt = 
s2.defaultStorage.serDe.snapshotEdgeDeserializer(schemaVer).fromKeyValues(kvs, 
None)
+////          if (snapshotEdgeOpt.isDefined) 
snapshotEdgeOpt.map(_.toEdge.asInstanceOf[GraphElement])
+////          else throw new IllegalStateException(s"column family indicate 
this is edge, but neither snapshot/index edge.")
+//          None
+//        }
+//      } else throw new IllegalStateException(s"wrong column family. 
${Bytes.toString(cf)}")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/DataFrameWriter.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/DataFrameWriter.scala
deleted file mode 100644
index 458821e..0000000
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/DataFrameWriter.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.s2graph.s2jobs.serde.writer
-
-import org.apache.s2graph.core.{GraphElement, S2Graph}
-import org.apache.s2graph.s2jobs.DegreeKey
-import org.apache.s2graph.s2jobs.serde.GraphElementWritable
-
-object DataFrameWriter {
-  type GraphElementTuple = (String, String, String, String, String, String, 
String, String)
-  val Fields = Seq("timestamp", "operation", "element", "from", "to", "label", 
"props", "direction")
-}
-
-class DataFrameWriter extends 
GraphElementWritable[DataFrameWriter.GraphElementTuple]{
-  override def write(s2: S2Graph)(element: GraphElement): (String, String, 
String, String, String, String, String, String) = {
-    val Array(ts, op, elem, from, to, label, props, dir) = 
element.toLogString().split("\t")
-
-    (ts, op, elem, from, to, label, props, dir)
-  }
-
-  override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): 
(String, String, String, String, String, String, String, String) = {
-    val element = DegreeKey.toEdge(s2, degreeKey, count)
-    val Array(ts, op, elem, from, to, label, props, dir) = 
element.toLogString().split("\t")
-
-    (ts, op, elem, from, to, label, props, dir)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/GraphElementDataFrameWriter.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/GraphElementDataFrameWriter.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/GraphElementDataFrameWriter.scala
new file mode 100644
index 0000000..b6cbbb3
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/GraphElementDataFrameWriter.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.serde.writer
+
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.DegreeKey
+import org.apache.s2graph.s2jobs.serde.GraphElementWritable
+
+object GraphElementDataFrameWriter {
+  type GraphElementTuple = (Long, String, String, String, String, String, 
String, String)
+  val Fields = Seq("timestamp", "operation", "element", "from", "to", "label", 
"props", "direction")
+}
+
+class GraphElementDataFrameWriter extends 
GraphElementWritable[GraphElementDataFrameWriter.GraphElementTuple] {
+  import GraphElementDataFrameWriter._
+  private def toGraphElementTuple(tokens: Array[String]): GraphElementTuple = {
+    tokens match {
+      case Array(ts, op, elem, from, to, label, props, dir) => (ts.toLong, op, 
elem, from, to, label, props, dir)
+      case Array(ts, op, elem, from, to, label, props) => (ts.toLong, op, 
elem, from, to, label, props, "out")
+      case _ => throw new IllegalStateException(s"${tokens.toList} is 
malformed.")
+    }
+  }
+  override def write(s2: S2Graph)(element: GraphElement): GraphElementTuple = {
+    toGraphElementTuple(element.toLogString().split("\t"))
+  }
+
+  override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): 
GraphElementTuple = {
+    val element = DegreeKey.toEdge(s2, degreeKey, count)
+
+    toGraphElementTuple(element.toLogString().split("\t"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/IdentityWriter.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/IdentityWriter.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/IdentityWriter.scala
index 32bab6a..9d2656c 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/IdentityWriter.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/IdentityWriter.scala
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.s2graph.s2jobs.serde.writer
 
 import org.apache.s2graph.core.{GraphElement, S2Graph}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
index 3d0aefb..7b52415 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
@@ -19,10 +19,11 @@
 
 package org.apache.s2graph.s2jobs.task
 
-import org.apache.s2graph.core.Management
-import org.apache.s2graph.s2jobs.S2GraphHelper
-import org.apache.s2graph.s2jobs.loader.HFileGenerator
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.s2graph.core.{GraphElement, Management}
+import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, 
SparkBulkLoaderTransformer}
+import org.apache.s2graph.s2jobs.serde.reader.S2GraphCellReader
+import org.apache.s2graph.s2jobs.serde.writer.GraphElementDataFrameWriter
+import org.apache.spark.sql.{DataFrame, DataFrameWriter, SparkSession}
 
 
 /**
@@ -111,15 +112,29 @@ class S2GraphSource(conf: TaskConf) extends Source(conf) {
   override def mandatoryOptions: Set[String] = Set("hbase.rootdir", 
"restore.path", "hbase.table.names")
 
   override def toDF(ss: SparkSession): DataFrame = {
-    val options = TaskConf.toGraphFileOptions(conf)
-    val config = Management.toConfig(options.toConfigParams)
+    import ss.sqlContext.implicits._
+    val mergedConf = TaskConf.parseHBaseConfigs(conf) ++ 
TaskConf.parseMetaStoreConfigs(conf)
+    val config = Management.toConfig(mergedConf)
 
     val snapshotPath = conf.options("hbase.rootdir")
     val restorePath = conf.options("restore.path")
     val tableNames = conf.options("hbase.table.names").split(",")
     val columnFamily = conf.options.getOrElse("hbase.table.cf", "e")
     val batchSize = conf.options.getOrElse("scan.batch.size", "1000").toInt
+    val labelMapping = Map.empty[String, String]
+    val buildDegree = conf.options.getOrElse("build.degree", "false").toBoolean
+    val elementType = conf.options.getOrElse("element.type", "IndexEdge")
 
-    HFileGenerator.tableSnapshotDump(ss, config, options, snapshotPath, 
restorePath, tableNames, columnFamily, batchSize)
+    val cells = HFileGenerator.tableSnapshotDump(ss, config, snapshotPath,
+      restorePath, tableNames, columnFamily, elementType, batchSize, 
labelMapping, buildDegree)
+
+
+    implicit val reader = new S2GraphCellReader(elementType)
+    implicit val writer = new GraphElementDataFrameWriter
+
+    val transformer = new SparkBulkLoaderTransformer(config, labelMapping, 
buildDegree)
+    val kvs = transformer.transform(cells)
+
+    kvs.toDF(GraphElementDataFrameWriter.Fields: _*)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
index ddd56bf..6ba2468 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
@@ -29,6 +29,15 @@ object TaskConf {
 
     GraphFileOptions.toOption(args)
   }
+
+  def parseHBaseConfigs(taskConf: TaskConf): Map[String, String] = {
+    taskConf.options.filterKeys(_.startsWith("hbase."))
+  }
+
+  def parseMetaStoreConfigs(taskConf: TaskConf): Map[String, String] = {
+    taskConf.options.filterKeys(_.startsWith("db."))
+  }
+
 }
 case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, 
options:Map[String, String] = Map.empty)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
index f2b0102..6b21cfc 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
@@ -29,7 +29,7 @@ class S2GraphHelperTest extends BaseSparkTest {
 
     println(args)
     val taskConf = TaskConf("dummy", "sink", Nil, args)
-    val graphFileOptions = S2GraphHelper.toGraphFileOptions(taskConf)
+    val graphFileOptions = TaskConf.toGraphFileOptions(taskConf)
     println(graphFileOptions)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
index 991897b..dfdb595 100644
--- 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
+++ 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
@@ -190,7 +190,7 @@ class GraphFileGeneratorTest extends BaseSparkTest {
       val input = sc.parallelize(bulkVertexLs)
 
       HFileGenerator.generate(sc, s2Config, input, options)
-      HFileGenerator.loadIncrementHFile(options)
+      HFileGenerator.loadIncrementalHFiles(options)
 
       val s2Vertices = 
s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
       val json = PostProcess.verticesToJson(s2Vertices)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
index d2788c2..6e712a6 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
@@ -19,8 +19,11 @@
 
 package org.apache.s2graph.s2jobs.task
 
+import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
 import org.apache.s2graph.core.S2EdgeLike
-import org.apache.s2graph.s2jobs.BaseSparkTest
+import org.apache.s2graph.core.storage.hbase.{AsynchbaseStorage, 
AsynchbaseStorageManagement}
+import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
+import org.apache.s2graph.s2jobs.{BaseSparkTest, S2GraphHelper}
 import org.apache.spark.sql.DataFrame
 
 import scala.collection.JavaConverters._
@@ -42,28 +45,57 @@ class SourceTest extends BaseSparkTest {
     }.toDF("timestamp", "operation", "element", "from", "to", "label", 
"props", "direction")
   }
 
-  test("S2graphSink writeBatch.") {
+
+  test("S2GraphSource toDF") {
     val label = initTestEdgeSchema(s2, tableName, schemaVersion, 
compressionAlgorithm)
+    val snapshotTableName = options.tableName + "-snapshot"
 
+    // 1. run S2GraphSink to build(not actually load by using 
LoadIncrementalLoad) bulk load file.
     val bulkEdgeString = 
"1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
     val df = toDataFrame(Seq(bulkEdgeString))
+
+    val reader = new RowBulkFormatReader
+
+    val inputEdges = df.collect().flatMap(reader.read(s2)(_))
+
     val args = options.toCommand.grouped(2).map { kv =>
       kv.head -> kv.last
-    }.toMap
+    }.toMap ++ Map("writeMethod" -> "bulk", "runLoadIncrementalHFiles" -> 
"true")
 
     val conf = TaskConf("test", "sql", Seq("input"), args)
 
     val sink = new S2GraphSink("testQuery", conf)
     sink.write(df)
 
+    // 2. create snapshot if snapshot is not exist to test 
TableSnapshotInputFormat.
+    
s2.defaultStorage.management.asInstanceOf[AsynchbaseStorageManagement].withAdmin(s2.config)
 { admin =>
+      import scala.collection.JavaConverters._
+      if 
(admin.listSnapshots(snapshotTableName).asScala.toSet(snapshotTableName))
+        admin.deleteSnapshot(snapshotTableName)
+
+      admin.snapshot(snapshotTableName, TableName.valueOf(options.tableName))
+    }
+
+    // 3. Decode S2GraphSource to parse HFile
+    val metaAndHBaseArgs = options.toConfigParams
+    val hbaseConfig = 
HBaseConfiguration.create(spark.sparkContext.hadoopConfiguration)
+
     val dumpArgs = Map(
-      "hbase.rootdir" -> "",
-      "restore.path" -> "",
-      "hbase.table.names" -> Seq(options.tableName).mkString(","),
-      "hbase.table.cf" -> "e"
-    )
+      "hbase.rootdir" -> hbaseConfig.get("hbase.rootdir"),
+      "restore.path" -> "/tmp/hbase_restore",
+      "hbase.table.names" -> s"${snapshotTableName}",
+      "hbase.table.cf" -> "e",
+      "element.type" -> "IndexEdge"
+    ) ++ metaAndHBaseArgs
+
     val dumpConf = TaskConf("dump", "sql", Seq("input"), dumpArgs)
-    val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
-    s2Edges.foreach { edge => println(edge) }
+    val source = new S2GraphSource(dumpConf)
+    val realDF = source.toDF(spark)
+    val outputEdges = realDF.collect().flatMap(reader.read(s2)(_))
+
+    inputEdges.foreach { e => println(s"[Input]: $e")}
+    outputEdges.foreach { e => println(s"[Output]: $e")}
+
+    inputEdges shouldBe outputEdges
   }
 }

Reply via email to