Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 3332f6bc1 -> 1799ae456


implement S2GraphSink.write method for writeBatch.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b5535ebc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b5535ebc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b5535ebc

Branch: refs/heads/master
Commit: b5535ebccf72cd71ba45c177dc1d4ea2adaf94b8
Parents: 3332f6b
Author: DO YUNG YOON <steams...@apache.org>
Authored: Mon Apr 2 17:10:45 2018 +0900
Committer: DO YUNG YOON <steams...@apache.org>
Committed: Mon Apr 2 17:10:45 2018 +0900

----------------------------------------------------------------------
 .../apache/s2graph/s2jobs/S2GraphHelper.scala   |  57 ++++-
 .../SparkGraphElementLoaderTransformer.scala    |  75 +++++++
 .../serde/reader/RowBulkFormatReader.scala      |  14 ++
 .../org/apache/s2graph/s2jobs/task/Sink.scala   |  25 ++-
 .../sql/streaming/S2StreamQueryWriter.scala     |  88 ++++----
 .../apache/s2graph/s2jobs/BaseSparkTest.scala   |  13 +-
 .../s2jobs/loader/GraphFileGeneratorTest.scala  | 216 ++++++++++---------
 7 files changed, 338 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index 3f80e8f..9eb9cc8 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@ -24,9 +24,15 @@ import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
 import org.apache.s2graph.core.storage.SKeyValue
 import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
-import play.api.libs.json.Json
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.types.StructType
+import play.api.libs.json.{JsObject, Json}
 
 import scala.concurrent.ExecutionContext
+import scala.util.Try
 
 object S2GraphHelper {
   def initS2Graph(config: Config)(implicit ec: ExecutionContext = 
ExecutionContext.Implicits.global): S2Graph = {
@@ -78,4 +84,53 @@ object S2GraphHelper {
       Nil
     }
   }
+
+  //TODO:
+  def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = {
+    GraphFileOptions()
+  }
+
+  def sparkSqlRowToGraphElement(s2: S2Graph, row: Row, schema: StructType, 
reservedColumn: Set[String]): Option[GraphElement] = {
+    val timestamp = row.getAs[Long]("timestamp")
+    val operation = Try(row.getAs[String]("operation")).getOrElse("insert")
+    val elem = Try(row.getAs[String]("elem")).getOrElse("e")
+
+    val props: Map[String, Any] = Option(row.getAs[String]("props")) match {
+      case Some(propsStr:String) =>
+        JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject])
+      case None =>
+        schema.fieldNames.flatMap { field =>
+          if (!reservedColumn.contains(field)) {
+            Seq(
+              field -> getRowValAny(row, field)
+            )
+          } else Nil
+        }.toMap
+    }
+
+    elem match {
+      case "e" | "edge" =>
+        val from = getRowValAny(row, "from")
+        val to = getRowValAny(row, "to")
+        val label = row.getAs[String]("label")
+        val direction = Try(row.getAs[String]("direction")).getOrElse("out")
+        Some(
+          s2.elementBuilder.toEdge(from, to, label, direction, props, 
timestamp, operation)
+        )
+      case "v" | "vertex" =>
+        val id = getRowValAny(row, "id")
+        val serviceName = row.getAs[String]("service")
+        val columnName = row.getAs[String]("column")
+        Some(
+          s2.elementBuilder.toVertex(serviceName, columnName, id, props, 
timestamp, operation)
+        )
+      case _ =>
+        None
+    }
+  }
+
+  private def getRowValAny(row:Row, fieldName:String):Any = {
+    val idx = row.fieldIndex(fieldName)
+    row.get(idx)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala
new file mode 100644
index 0000000..fcf8d4c
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.loader
+
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
+import org.apache.s2graph.core.GraphElement
+import org.apache.s2graph.s2jobs.serde.Transformer
+import org.apache.s2graph.s2jobs.serde.reader.{RowBulkFormatReader, 
TsvBulkFormatReader}
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+
+class SparkGraphElementLoaderTransformer(val config: Config,
+                                         val options: GraphFileOptions) 
extends Transformer[Row, Seq[HKeyValue], org.apache.spark.rdd.RDD] {
+  val reader = new RowBulkFormatReader
+
+  val writer = new KeyValueWriter
+
+  override def read(input: RDD[Row]): RDD[GraphElement] = input.mapPartitions 
{ iter =>
+    val s2 = S2GraphHelper.initS2Graph(config)
+
+    iter.flatMap(reader.read(s2)(_))
+  }
+
+  override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = 
elements.mapPartitions { iter =>
+    val s2 = S2GraphHelper.initS2Graph(config)
+
+    iter.map(writer.write(s2)(_))
+  }
+
+  override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] 
= {
+    val degrees = elements.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(config)
+
+      iter.flatMap { element =>
+        DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 
1L)
+      }
+    }.reduceByKey(_ + _)
+
+    degrees.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(config)
+
+      iter.map { case (degreeKey, count) =>
+        DegreeKey.toKeyValue(s2, degreeKey, count)
+      }
+    }
+  }
+
+  override def transform(input: RDD[Row]): RDD[Seq[HKeyValue]] = {
+    val elements = read(input)
+    val kvs = write(elements)
+
+    if (options.buildDegree) kvs ++ buildDegrees(elements)
+    kvs
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
new file mode 100644
index 0000000..73e56ce
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
@@ -0,0 +1,14 @@
+package org.apache.s2graph.s2jobs.serde.reader
+
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.S2GraphHelper
+import org.apache.s2graph.s2jobs.serde.GraphElementReadable
+import org.apache.spark.sql.Row
+
+class RowBulkFormatReader extends GraphElementReadable[Row] {
+  private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", 
"operation", "elem", "direction")
+
+  override def read(s2: S2Graph)(row: Row): Option[GraphElement] =
+    S2GraphHelper.sparkSqlRowToGraphElement(s2, row, row.schema, 
RESERVED_COLUMN)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 3d5beb6..b7a91d9 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -19,6 +19,12 @@
 
 package org.apache.s2graph.s2jobs.task
 
+import com.typesafe.config.Config
+import org.apache.s2graph.core.{GraphElement, Management}
+import org.apache.s2graph.s2jobs.S2GraphHelper
+import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, 
SparkBulkLoaderTransformer, SparkGraphElementLoaderTransformer}
+import org.apache.s2graph.s2jobs.serde.GraphElementReadable
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger}
 import org.elasticsearch.spark.sql.EsSparkSQL
@@ -189,7 +195,22 @@ class S2graphSink(queryName:String, conf:TaskConf) extends 
Sink(queryName, conf)
   override def mandatoryOptions: Set[String] = Set()
   override val FORMAT: String = 
"org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
 
-  override protected def writeBatch(writer: DataFrameWriter[Row]): Unit =
-    throw new RuntimeException(s"unsupported source type for 
${this.getClass.getSimpleName} : ${conf.name}")
+  private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", 
"operation", "elem", "direction")
+
+  override def write(inputDF: DataFrame):Unit = {
+    val df = repartition(preprocess(inputDF), 
inputDF.sparkSession.sparkContext.defaultParallelism)
+
+    if (inputDF.isStreaming) writeStream(df.writeStream)
+    else {
+      val config: Config = Management.toConfig(conf.options)
+      val bulkLoadOptions: GraphFileOptions = 
S2GraphHelper.toGraphFileOptions(conf)
+      val input = df.rdd
+
+      val transformer = new SparkGraphElementLoaderTransformer(config, 
bulkLoadOptions)
+      val kvs = transformer.transform(input)
+
+      HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, 
kvs.flatMap(ls => ls), bulkLoadOptions)
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
index e15efe8..ac37533 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.spark.sql.streaming
 
 import com.typesafe.config.ConfigFactory
 import org.apache.s2graph.core.{GraphElement, JSONParser}
+import org.apache.s2graph.s2jobs.S2GraphHelper
 import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.Row
@@ -83,48 +84,51 @@ private [sql] class S2StreamQueryWriter(
     }
   }
 
-  private def rowToEdge(internalRow:InternalRow): Option[GraphElement] = {
-    val s2Graph = s2SinkContext.getGraph
-    val row = encoder.fromRow(internalRow)
-
-    val timestamp = row.getAs[Long]("timestamp")
-    val operation = Try(row.getAs[String]("operation")).getOrElse("insert")
-    val elem = Try(row.getAs[String]("elem")).getOrElse("e")
-
-    val props: Map[String, Any] = Option(row.getAs[String]("props")) match {
-      case Some(propsStr:String) =>
-        JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject])
-      case None =>
-        schema.fieldNames.flatMap { field =>
-          if (!RESERVED_COLUMN.contains(field)) {
-            Seq(
-              field -> getRowValAny(row, field)
-            )
-          } else Nil
-        }.toMap
-    }
-
-    elem match {
-      case "e" | "edge" =>
-        val from = getRowValAny(row, "from")
-        val to = getRowValAny(row, "to")
-        val label = row.getAs[String]("label")
-        val direction = Try(row.getAs[String]("direction")).getOrElse("out")
-        Some(
-          s2Graph.elementBuilder.toEdge(from, to, label, direction, props, 
timestamp, operation)
-        )
-      case "v" | "vertex" =>
-        val id = getRowValAny(row, "id")
-        val serviceName = row.getAs[String]("service")
-        val columnName = row.getAs[String]("column")
-        Some(
-          s2Graph.elementBuilder.toVertex(serviceName, columnName, id, props, 
timestamp, operation)
-        )
-      case _ =>
-        logger.warn(s"'$elem' is not GraphElement. skipped!! 
(${row.toString()})")
-        None
-    }
-  }
+  private def rowToEdge(internalRow:InternalRow): Option[GraphElement] =
+    S2GraphHelper.sparkSqlRowToGraphElement(s2SinkContext.getGraph, 
encoder.fromRow(internalRow), schema, RESERVED_COLUMN)
+
+//  {
+//    val s2Graph = s2SinkContext.getGraph
+//    val row = encoder.fromRow(internalRow)
+//
+//    val timestamp = row.getAs[Long]("timestamp")
+//    val operation = Try(row.getAs[String]("operation")).getOrElse("insert")
+//    val elem = Try(row.getAs[String]("elem")).getOrElse("e")
+//
+//    val props: Map[String, Any] = Option(row.getAs[String]("props")) match {
+//      case Some(propsStr:String) =>
+//        JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject])
+//      case None =>
+//        schema.fieldNames.flatMap { field =>
+//          if (!RESERVED_COLUMN.contains(field)) {
+//            Seq(
+//              field -> getRowValAny(row, field)
+//            )
+//          } else Nil
+//        }.toMap
+//    }
+//
+//    elem match {
+//      case "e" | "edge" =>
+//        val from = getRowValAny(row, "from")
+//        val to = getRowValAny(row, "to")
+//        val label = row.getAs[String]("label")
+//        val direction = Try(row.getAs[String]("direction")).getOrElse("out")
+//        Some(
+//          s2Graph.elementBuilder.toEdge(from, to, label, direction, props, 
timestamp, operation)
+//        )
+//      case "v" | "vertex" =>
+//        val id = getRowValAny(row, "id")
+//        val serviceName = row.getAs[String]("service")
+//        val columnName = row.getAs[String]("column")
+//        Some(
+//          s2Graph.elementBuilder.toVertex(serviceName, columnName, id, 
props, timestamp, operation)
+//        )
+//      case _ =>
+//        logger.warn(s"'$elem' is not GraphElement. skipped!! 
(${row.toString()})")
+//        None
+//    }
+//  }
 
   private def getRowValAny(row:Row, fieldName:String):Any = {
     val idx = row.fieldIndex(fieldName)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
index 78000d4..4f02808 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.s2jobs
 
 import java.io.{File, PrintWriter}
 
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
 import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
 import org.apache.s2graph.core.{Management, S2Graph}
@@ -31,11 +32,10 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
 import scala.util.Try
 
-class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll {
+class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with 
DataFrameSuiteBase {
   private val master = "local[2]"
   private val appName = "example-spark"
 
-  protected var sc: SparkContext = _
   protected val options = GraphFileOptions(
     input = "/tmp/test.txt",
     tempDir = "/tmp/bulkload_tmp",
@@ -65,18 +65,15 @@ class BaseSparkTest extends FunSuite with Matchers with 
BeforeAndAfterAll {
 
   override def beforeAll(): Unit = {
     // initialize spark context.
-    val conf = new SparkConf()
-      .setMaster(master)
-      .setAppName(appName)
-
-    sc = new SparkContext(conf)
+    super.beforeAll()
 
     s2 = S2GraphHelper.initS2Graph(s2Config)
     initTestDataFile
   }
 
   override def afterAll(): Unit = {
-    if (sc != null) sc.stop()
+    super.afterAll()
+
     if (s2 != null) s2.shutdown()
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b5535ebc/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
index 3fbbd88..baf9b32 100644
--- 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
+++ 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
@@ -19,24 +19,22 @@
 
 package org.apache.s2graph.s2jobs.loader
 
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
-import org.apache.hadoop.util.ToolRunner
-import org.apache.s2graph.core.{PostProcess, S2VertexLike}
+import org.apache.s2graph.core.PostProcess
 import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue}
 import org.apache.s2graph.s2jobs.BaseSparkTest
+import org.apache.spark.rdd.RDD
 import play.api.libs.json.Json
 
-import scala.io.Source
-
 class GraphFileGeneratorTest extends BaseSparkTest {
-  import scala.concurrent.ExecutionContext.Implicits.global
+
   import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
 
+  import scala.concurrent.ExecutionContext.Implicits.global
+
   def transformToSKeyValues(transformerMode: String, edges: Seq[String]): 
List[SKeyValue] = {
     transformerMode match {
       case "spark" =>
-        val input = sc.parallelize(edges)
+        val input: RDD[String] = sc.parallelize(edges)
         val transformer = new SparkBulkLoaderTransformer(s2Config, options)
         val kvs = transformer.transform(input)
         kvs.flatMap { kvs =>
@@ -54,39 +52,39 @@ class GraphFileGeneratorTest extends BaseSparkTest {
             CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)
           }
         }.toList
-    }
-  }
-  test("test generateKeyValues edge only. SparkBulkLoaderTransformer") {
-    val label = initTestEdgeSchema(s2, tableName, schemaVersion, 
compressionAlgorithm)
-    /* end of initialize model */
-
-    val bulkEdgeString = 
"1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
-
-    val transformerMode = "spark"
-    val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString))
-
-    val serDe = s2.defaultStorage.serDe
-
-    val bulkEdge = s2.elementBuilder.toGraphElement(bulkEdgeString, 
options.labelMapping).get
 
-    val indexEdges = ls.flatMap { kv =>
-      serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), 
None)
+      case "dataset" =>
+        import spark.sqlContext.implicits._
+        val elements = edges.flatMap(s2.elementBuilder.toEdge(_))
+
+        val rows = elements.map { e =>
+          (e.getTs(),
+            e.getOperation(),
+            "e",
+            e.srcVertex.innerIdVal.toString,
+            e.tgtVertex.innerIdVal.toString,
+            e.label(),
+            "{}",
+            e.getDirection())
+        }.toDF("timestamp", "operation", "element", "from", "to", "label", 
"props", "direction").rdd
+
+        val transformer = new SparkGraphElementLoaderTransformer(s2Config, 
options)
+        val kvs = transformer.transform(rows)
+        kvs.flatMap { kvs =>
+          kvs.map { kv =>
+            CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)
+          }
+        }.collect().toList
     }
-
-    val indexEdge = indexEdges.head
-
-    println(indexEdge)
-    println(bulkEdge)
-
-    bulkEdge shouldBe (indexEdge)
   }
-  test("test generateKeyValues edge only. LocalBulkLoaderTransformer") {
+
+  test("test generateKeyValues edge only. SparkBulkLoaderTransformer") {
     val label = initTestEdgeSchema(s2, tableName, schemaVersion, 
compressionAlgorithm)
     /* end of initialize model */
 
     val bulkEdgeString = 
"1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
 
-    val transformerMode = "local"
+    val transformerMode = "dataset"
     val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString))
 
     val serDe = s2.defaultStorage.serDe
@@ -104,82 +102,106 @@ class GraphFileGeneratorTest extends BaseSparkTest {
 
     bulkEdge shouldBe (indexEdge)
   }
-
-  test("test generateKeyValues vertex only. SparkBulkLoaderTransformer") {
-    val serviceColumn = initTestVertexSchema(s2)
-    val bulkVertexString = 
"20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
-    val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, 
options.labelMapping).get
-
-    val transformerMode = "spark"
-    val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString))
-
-    val serDe = s2.defaultStorage.serDe
-
-    val vertex = 
serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, 
None).get
-
-    PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
-      println(Json.prettyPrint(jsValue))
-    }
-
-    bulkVertex shouldBe(vertex)
-  }
-
-  test("test generateKeyValues vertex only. LocalBulkLoaderTransformer") {
-    val serviceColumn = initTestVertexSchema(s2)
-    val bulkVertexString = 
"20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
-    val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, 
options.labelMapping).get
-
-    val transformerMode = "local"
-    val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString))
-
-    val serDe = s2.defaultStorage.serDe
-
-    val vertex = 
serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, 
None).get
-
-    PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
-      println(Json.prettyPrint(jsValue))
-    }
-
-    bulkVertex shouldBe(vertex)
-  }
-
-//   this test case expect options.input already exist with valid bulk load 
format.
-//  test("bulk load and fetch vertex: spark mode") {
-//    import scala.collection.JavaConverters._
-//    val serviceColumn = initTestVertexSchema(s2)
+//  test("test generateKeyValues edge only. LocalBulkLoaderTransformer") {
+//    val label = initTestEdgeSchema(s2, tableName, schemaVersion, 
compressionAlgorithm)
+//    /* end of initialize model */
+//
+//    val bulkEdgeString = 
"1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
 //
-//    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
-//    val input = sc.parallelize(bulkVertexLs)
+//    val transformerMode = "local"
+//    val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString))
 //
-//    HFileGenerator.generate(sc, s2Config, input, options)
+//    val serDe = s2.defaultStorage.serDe
 //
-//    val hfileArgs = Array(options.output, options.tableName)
-//    val hbaseConfig = HBaseConfiguration.create()
+//    val bulkEdge = s2.elementBuilder.toGraphElement(bulkEdgeString, 
options.labelMapping).get
 //
-//    val ret = ToolRunner.run(hbaseConfig, new 
LoadIncrementalHFiles(hbaseConfig), hfileArgs)
+//    val indexEdges = ls.flatMap { kv =>
+//      
serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), None)
+//    }
 //
-//    val s2Vertices = 
s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
-//    val json = PostProcess.verticesToJson(s2Vertices)
+//    val indexEdge = indexEdges.head
 //
-//    println(Json.prettyPrint(json))
+//    println(indexEdge)
+//    println(bulkEdge)
+//
+//    bulkEdge shouldBe (indexEdge)
 //  }
-
-//   this test case expect options.input already exist with valid bulk load 
format.
-//  test("bulk load and fetch vertex: mr mode") {
+//
+//  test("test generateKeyValues vertex only. SparkBulkLoaderTransformer") {
+//    val serviceColumn = initTestVertexSchema(s2)
+//    val bulkVertexString = 
"20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
+//    val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, 
options.labelMapping).get
+//
+//    val transformerMode = "spark"
+//    val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString))
+//
+//    val serDe = s2.defaultStorage.serDe
+//
+//    val vertex = 
serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, 
None).get
+//
+//    PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
+//      println(Json.prettyPrint(jsValue))
+//    }
+//
+//    bulkVertex shouldBe (vertex)
+//  }
+//
+//  test("test generateKeyValues vertex only. LocalBulkLoaderTransformer") {
 //    val serviceColumn = initTestVertexSchema(s2)
+//    val bulkVertexString = 
"20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
+//    val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, 
options.labelMapping).get
 //
-//    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
-//    val input = sc.parallelize(bulkVertexLs)
+//    val transformerMode = "local"
+//    val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString))
 //
-//    HFileMRGenerator.generate(sc, s2Config, input, options)
+//    val serDe = s2.defaultStorage.serDe
 //
-//    val hfileArgs = Array(options.output, options.tableName)
-//    val hbaseConfig = HBaseConfiguration.create()
+//    val vertex = 
serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, 
None).get
 //
-//    val ret = ToolRunner.run(hbaseConfig, new 
LoadIncrementalHFiles(hbaseConfig), hfileArgs)
-//    val s2Vertices = 
s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
-//    val json = PostProcess.verticesToJson(s2Vertices)
+//    PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
+//      println(Json.prettyPrint(jsValue))
+//    }
 //
-//    println(Json.prettyPrint(json))
+//    bulkVertex shouldBe (vertex)
 //  }
+
+  //   this test case expect options.input already exist with valid bulk load 
format.
+  //  test("bulk load and fetch vertex: spark mode") {
+  //    import scala.collection.JavaConverters._
+  //    val serviceColumn = initTestVertexSchema(s2)
+  //
+  //    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+  //    val input = sc.parallelize(bulkVertexLs)
+  //
+  //    HFileGenerator.generate(sc, s2Config, input, options)
+  //
+  //    val hfileArgs = Array(options.output, options.tableName)
+  //    val hbaseConfig = HBaseConfiguration.create()
+  //
+  //    val ret = ToolRunner.run(hbaseConfig, new 
LoadIncrementalHFiles(hbaseConfig), hfileArgs)
+  //
+  //    val s2Vertices = 
s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+  //    val json = PostProcess.verticesToJson(s2Vertices)
+  //
+  //    println(Json.prettyPrint(json))
+  //  }
+
+  //   this test case expect options.input already exist with valid bulk load 
format.
+  //  test("bulk load and fetch vertex: mr mode") {
+  //    val serviceColumn = initTestVertexSchema(s2)
+  //
+  //    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+  //    val input = sc.parallelize(bulkVertexLs)
+  //
+  //    HFileMRGenerator.generate(sc, s2Config, input, options)
+  //
+  //    val hfileArgs = Array(options.output, options.tableName)
+  //    val hbaseConfig = HBaseConfiguration.create()
+  //
+  //    val ret = ToolRunner.run(hbaseConfig, new 
LoadIncrementalHFiles(hbaseConfig), hfileArgs)
+  //    val s2Vertices = 
s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+  //    val json = PostProcess.verticesToJson(s2Vertices)
+  //
+  //    println(Json.prettyPrint(json))
+  //  }
 }

Reply via email to