implement writeBatch only for LoadIncrementHFile.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/86dcc112
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/86dcc112
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/86dcc112

Branch: refs/heads/master
Commit: 86dcc112d9582435c136ca60e050be3e993e685f
Parents: ae19dc1
Author: DO YUNG YOON <[email protected]>
Authored: Mon Apr 2 18:58:51 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Mon Apr 2 19:02:53 2018 +0900

----------------------------------------------------------------------
 .../apache/s2graph/s2jobs/S2GraphHelper.scala   |   5 +-
 .../s2jobs/loader/GraphFileOptions.scala        |  24 +++
 .../s2graph/s2jobs/loader/HFileGenerator.scala  |  10 +-
 .../serde/reader/RowBulkFormatReader.scala      |  19 ++
 .../org/apache/s2graph/s2jobs/task/Sink.scala   |  16 +-
 .../sql/streaming/S2StreamQueryWriter.scala     |  48 -----
 .../s2jobs/dump/GraphFileDumperTest.scala       |  97 ----------
 .../s2jobs/loader/GraphFileGeneratorTest.scala  | 192 +++++++++----------
 .../apache/s2graph/s2jobs/task/SinkTest.scala   |  62 ++++++
 9 files changed, 221 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index 9eb9cc8..69b3716 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@ -85,9 +85,10 @@ object S2GraphHelper {
     }
   }
 
-  //TODO:
   def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = {
-    GraphFileOptions()
+    val args = taskConf.options.flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray
+
+    GraphFileOptions.toOption(args)
   }
 
   def sparkSqlRowToGraphElement(s2: S2Graph, row: Row, schema: StructType, 
reservedColumn: Set[String]): Option[GraphElement] = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
index 3e3ffb9..4bf8379 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
@@ -91,6 +91,10 @@ object GraphFileOptions {
       (inner.head, inner.last)
     }).toMap
   }
+
+  def toLabelMappingString(labelMapping: Map[String, String]): String =
+    labelMapping.map { case (k, v) => Seq(k, v).mkString(":") }.mkString(",")
+
 }
 /**
   * Option case class for TransferToHFile.
@@ -135,4 +139,24 @@ case class GraphFileOptions(input: String = "",
       "db.default.driver" -> dbDriver
     )
   }
+
+  def toCommand: Array[String] =
+    Array(
+      "--input", input,
+      "--tempDir", tempDir,
+      "--output", output,
+      "--zkQuorum", zkQuorum,
+      "--table", tableName,
+      "--dbUrl", dbUrl,
+      "--dbUser", dbUser,
+      "--dbPassword", dbPassword,
+      "--dbDriver", dbDriver,
+      "--maxHFilePerRegionServer", maxHFilePerRegionServer.toString,
+      "--numRegions", numRegions.toString,
+      "--labelMapping", GraphFileOptions.toLabelMappingString(labelMapping),
+      "--autoEdgeCreate", autoEdgeCreate.toString,
+      "--buildDegree", buildDegree.toString,
+      "--incrementalLoad", incrementalLoad.toString,
+      "--method", method
+    )
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index 431631b..da190ee 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -24,10 +24,11 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.client.ConnectionFactory
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
+import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, 
TableOutputFormat}
 import org.apache.hadoop.hbase.regionserver.BloomType
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, 
TableName}
+import org.apache.hadoop.util.ToolRunner
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
 import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
 import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
@@ -123,5 +124,12 @@ object HFileGenerator extends RawFileGenerator[String, 
KeyValue] {
 
     HFileGenerator.generateHFile(sc, config, kvs, _options)
   }
+
+  def loadIncrementHFile(options: GraphFileOptions): Int = {
+    /* LoadIncrementHFiles */
+    val hfileArgs = Array(options.output, options.tableName)
+    val hbaseConfig = HBaseConfiguration.create()
+    ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), 
hfileArgs)
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
index 73e56ce..12b2ba2 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.s2graph.s2jobs.serde.reader
 
 import org.apache.s2graph.core.{GraphElement, S2Graph}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index bc67822..7c4c857 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -20,6 +20,9 @@
 package org.apache.s2graph.s2jobs.task
 
 import com.typesafe.config.Config
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
+import org.apache.hadoop.util.ToolRunner
 import org.apache.s2graph.core.Management
 import org.apache.s2graph.s2jobs.S2GraphHelper
 import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, 
SparkBulkLoaderTransformer}
@@ -206,25 +209,26 @@ class S2graphSink(queryName: String, conf: TaskConf) 
extends Sink(queryName, con
 
   override val FORMAT: String = 
"org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
 
-  private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", 
"operation", "elem", "direction")
-
   override def write(inputDF: DataFrame): Unit = {
     val df = repartition(preprocess(inputDF), 
inputDF.sparkSession.sparkContext.defaultParallelism)
 
     if (inputDF.isStreaming) writeStream(df.writeStream)
     else {
-      val config: Config = Management.toConfig(conf.options)
-      val bulkLoadOptions: GraphFileOptions = 
S2GraphHelper.toGraphFileOptions(conf)
+      val options = S2GraphHelper.toGraphFileOptions(conf)
+      val config = Management.toConfig(options.toConfigParams)
       val input = df.rdd
 
-      val transformer = new SparkBulkLoaderTransformer(config, bulkLoadOptions)
+      val transformer = new SparkBulkLoaderTransformer(config, options)
 
       implicit val reader = new RowBulkFormatReader
       implicit val writer = new KeyValueWriter
 
       val kvs = transformer.transform(input)
 
-      HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, 
kvs.flatMap(ls => ls), bulkLoadOptions)
+      HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, 
kvs.flatMap(ls => ls), options)
+
+      // finish bulk load by execute LoadIncrementHFile.
+      HFileGenerator.loadIncrementHFile(options)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
index ac37533..f6fecd7 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
@@ -86,52 +86,4 @@ private [sql] class S2StreamQueryWriter(
 
   private def rowToEdge(internalRow:InternalRow): Option[GraphElement] =
     S2GraphHelper.sparkSqlRowToGraphElement(s2SinkContext.getGraph, 
encoder.fromRow(internalRow), schema, RESERVED_COLUMN)
-
-//  {
-//    val s2Graph = s2SinkContext.getGraph
-//    val row = encoder.fromRow(internalRow)
-//
-//    val timestamp = row.getAs[Long]("timestamp")
-//    val operation = Try(row.getAs[String]("operation")).getOrElse("insert")
-//    val elem = Try(row.getAs[String]("elem")).getOrElse("e")
-//
-//    val props: Map[String, Any] = Option(row.getAs[String]("props")) match {
-//      case Some(propsStr:String) =>
-//        JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject])
-//      case None =>
-//        schema.fieldNames.flatMap { field =>
-//          if (!RESERVED_COLUMN.contains(field)) {
-//            Seq(
-//              field -> getRowValAny(row, field)
-//            )
-//          } else Nil
-//        }.toMap
-//    }
-//
-//    elem match {
-//      case "e" | "edge" =>
-//        val from = getRowValAny(row, "from")
-//        val to = getRowValAny(row, "to")
-//        val label = row.getAs[String]("label")
-//        val direction = Try(row.getAs[String]("direction")).getOrElse("out")
-//        Some(
-//          s2Graph.elementBuilder.toEdge(from, to, label, direction, props, 
timestamp, operation)
-//        )
-//      case "v" | "vertex" =>
-//        val id = getRowValAny(row, "id")
-//        val serviceName = row.getAs[String]("service")
-//        val columnName = row.getAs[String]("column")
-//        Some(
-//          s2Graph.elementBuilder.toVertex(serviceName, columnName, id, 
props, timestamp, operation)
-//        )
-//      case _ =>
-//        logger.warn(s"'$elem' is not GraphElement. skipped!! 
(${row.toString()})")
-//        None
-//    }
-//  }
-
-  private def getRowValAny(row:Row, fieldName:String):Any = {
-    val idx = row.fieldIndex(fieldName)
-    row.get(idx)
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala
deleted file mode 100644
index 81566f9..0000000
--- 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-//package org.apache.s2graph.s2jobs.dump
-//
-//import org.apache.s2graph.core._
-//import org.apache.s2graph.core.types.HBaseType
-//import org.apache.s2graph.s2jobs.S2GraphHelper
-//import org.apache.s2graph.s2jobs.loader.GraphFileOptions
-//import org.apache.spark.{SparkConf, SparkContext}
-//import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-//import play.api.libs.json.Json
-//
-//class GraphFileDumperTest extends FunSuite with Matchers with 
BeforeAndAfterAll {
-//  private val master = "local[2]"
-//  private val appName = "example-spark"
-//
-//  private var sc: SparkContext = _
-//  val options = GraphFileOptions(
-//    input = "/tmp/imei-20.txt",
-//    tempDir = "/tmp/bulkload_tmp",
-//    output = "/tmp/s2graph_bulkload",
-//    zkQuorum = "localhost",
-//    dbUrl = "jdbc:h2:file:./var/metastore;MODE=MYSQL",
-//    dbUser = "sa",
-//    dbPassword = "sa",
-//    dbDriver = "org.h2.Driver",
-//    tableName = "s2graph",
-//    maxHFilePerRegionServer = 1,
-//    numRegions = 3,
-//    compressionAlgorithm = "NONE",
-//    buildDegree = false,
-//    autoEdgeCreate = false)
-//
-//  val s2Config = Management.toConfig(options.toConfigParams)
-//
-//  val tableName = options.tableName
-//  val schemaVersion = HBaseType.DEFAULT_VERSION
-//  val compressionAlgorithm: String = options.compressionAlgorithm
-//  var s2: S2Graph = _
-//
-//  override def beforeAll(): Unit = {
-//    // initialize spark context.
-//    val conf = new SparkConf()
-//      .setMaster(master)
-//      .setAppName(appName)
-//
-//    sc = new SparkContext(conf)
-//
-//    s2 = S2GraphHelper.initS2Graph(s2Config)
-//  }
-//
-//  override def afterAll(): Unit = {
-//    if (sc != null) sc.stop()
-//    if (s2 != null) s2.shutdown()
-//  }
-//
-//  test("test dump.") {
-//    implicit val graph = s2
-//    val snapshotPath = "/usr/local/var/hbase"
-//    val restorePath = "/tmp/hbase_restore"
-//    val snapshotTableNames = Seq("s2graph-snapshot")
-//
-//    val cellLs = HFileDumper.toKeyValue(sc, snapshotPath, restorePath,
-//      snapshotTableNames, columnFamily = "v")
-//
-//    val kvsLs = cellLs.map(CanGraphElement.cellsToSKeyValues).collect()
-//
-//    val elements = kvsLs.flatMap { kvs =>
-//      CanGraphElement.sKeyValueToGraphElement(s2)(kvs)
-//    }
-//
-//    elements.foreach { element =>
-//      val v = element.asInstanceOf[S2VertexLike]
-//      val json = Json.prettyPrint(PostProcess.s2VertexToJson(v).get)
-//
-//      println(json)
-//    }
-//
-//  }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
index c382813..991897b 100644
--- 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
+++ 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.s2jobs.loader
 
-import org.apache.s2graph.core.PostProcess
+import org.apache.s2graph.core.{PostProcess, S2VertexLike}
 import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue}
 import org.apache.s2graph.s2jobs.BaseSparkTest
 import org.apache.s2graph.s2jobs.serde.reader.{RowBulkFormatReader, 
TsvBulkFormatReader}
@@ -27,6 +27,8 @@ import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
 import org.apache.spark.rdd.RDD
 import play.api.libs.json.Json
 
+import scala.io.Source
+
 class GraphFileGeneratorTest extends BaseSparkTest {
 
   import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
@@ -116,106 +118,100 @@ class GraphFileGeneratorTest extends BaseSparkTest {
 
     bulkEdge shouldBe (indexEdge)
   }
-//  test("test generateKeyValues edge only. LocalBulkLoaderTransformer") {
-//    val label = initTestEdgeSchema(s2, tableName, schemaVersion, 
compressionAlgorithm)
-//    /* end of initialize model */
-//
-//    val bulkEdgeString = 
"1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
-//
-//    val transformerMode = "local"
-//    val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString))
-//
-//    val serDe = s2.defaultStorage.serDe
-//
-//    val bulkEdge = s2.elementBuilder.toGraphElement(bulkEdgeString, 
options.labelMapping).get
-//
-//    val indexEdges = ls.flatMap { kv =>
-//      
serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), None)
-//    }
-//
-//    val indexEdge = indexEdges.head
-//
-//    println(indexEdge)
-//    println(bulkEdge)
-//
-//    bulkEdge shouldBe (indexEdge)
-//  }
-//
-//  test("test generateKeyValues vertex only. SparkBulkLoaderTransformer") {
-//    val serviceColumn = initTestVertexSchema(s2)
-//    val bulkVertexString = 
"20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
-//    val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, 
options.labelMapping).get
-//
-//    val transformerMode = "spark"
-//    val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString))
-//
-//    val serDe = s2.defaultStorage.serDe
-//
-//    val vertex = 
serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, 
None).get
-//
-//    PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
-//      println(Json.prettyPrint(jsValue))
-//    }
-//
-//    bulkVertex shouldBe (vertex)
-//  }
-//
-//  test("test generateKeyValues vertex only. LocalBulkLoaderTransformer") {
-//    val serviceColumn = initTestVertexSchema(s2)
-//    val bulkVertexString = 
"20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
-//    val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, 
options.labelMapping).get
+  test("test generateKeyValues edge only. LocalBulkLoaderTransformer") {
+    val label = initTestEdgeSchema(s2, tableName, schemaVersion, 
compressionAlgorithm)
+    /* end of initialize model */
+
+    val bulkEdgeString = 
"1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
+
+    val transformerMode = "local"
+    val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString))
+
+    val serDe = s2.defaultStorage.serDe
+
+    val bulkEdge = s2.elementBuilder.toGraphElement(bulkEdgeString, 
options.labelMapping).get
+
+    val indexEdges = ls.flatMap { kv =>
+      serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), 
None)
+    }
+
+    val indexEdge = indexEdges.head
+
+    println(indexEdge)
+    println(bulkEdge)
+
+    bulkEdge shouldBe (indexEdge)
+  }
+
+  test("test generateKeyValues vertex only. SparkBulkLoaderTransformer") {
+    val serviceColumn = initTestVertexSchema(s2)
+    val bulkVertexString = 
"20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
+    val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, 
options.labelMapping).get
+
+    val transformerMode = "spark"
+    val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString))
+
+    val serDe = s2.defaultStorage.serDe
+
+    val vertex = 
serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, 
None).get
+
+    PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
+      println(Json.prettyPrint(jsValue))
+    }
+
+    bulkVertex shouldBe (vertex)
+  }
+
+  test("test generateKeyValues vertex only. LocalBulkLoaderTransformer") {
+    val serviceColumn = initTestVertexSchema(s2)
+    val bulkVertexString = 
"20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
+    val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, 
options.labelMapping).get
+
+    val transformerMode = "local"
+    val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString))
+
+    val serDe = s2.defaultStorage.serDe
+
+    val vertex = 
serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, 
None).get
+
+    PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
+      println(Json.prettyPrint(jsValue))
+    }
+
+    bulkVertex shouldBe (vertex)
+  }
+
+  //   this test case expect options.input already exist with valid bulk load 
format.
+    test("bulk load and fetch vertex: spark mode") {
+      import scala.collection.JavaConverters._
+      val serviceColumn = initTestVertexSchema(s2)
+
+      val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+      val input = sc.parallelize(bulkVertexLs)
+
+      HFileGenerator.generate(sc, s2Config, input, options)
+      HFileGenerator.loadIncrementHFile(options)
+
+      val s2Vertices = 
s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+      val json = PostProcess.verticesToJson(s2Vertices)
+
+      println(Json.prettyPrint(json))
+    }
+
+  //   this test case expect options.input already exist with valid bulk load 
format.
+//    test("bulk load and fetch vertex: mr mode") {
+//      import scala.collection.JavaConverters._
+//      val serviceColumn = initTestVertexSchema(s2)
 //
-//    val transformerMode = "local"
-//    val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString))
+//      val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+//      val input = sc.parallelize(bulkVertexLs)
 //
-//    val serDe = s2.defaultStorage.serDe
+//      HFileMRGenerator.generate(sc, s2Config, input, options)
+//      HFileGenerator.loadIncrementHFile(options)
 //
-//    val vertex = 
serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, 
None).get
+//      val s2Vertices = 
s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+//      val json = PostProcess.verticesToJson(s2Vertices)
 //
-//    PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
-//      println(Json.prettyPrint(jsValue))
+//      println(Json.prettyPrint(json))
 //    }
-//
-//    bulkVertex shouldBe (vertex)
-//  }
-
-  //   this test case expect options.input already exist with valid bulk load 
format.
-  //  test("bulk load and fetch vertex: spark mode") {
-  //    import scala.collection.JavaConverters._
-  //    val serviceColumn = initTestVertexSchema(s2)
-  //
-  //    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
-  //    val input = sc.parallelize(bulkVertexLs)
-  //
-  //    HFileGenerator.generate(sc, s2Config, input, options)
-  //
-  //    val hfileArgs = Array(options.output, options.tableName)
-  //    val hbaseConfig = HBaseConfiguration.create()
-  //
-  //    val ret = ToolRunner.run(hbaseConfig, new 
LoadIncrementalHFiles(hbaseConfig), hfileArgs)
-  //
-  //    val s2Vertices = 
s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
-  //    val json = PostProcess.verticesToJson(s2Vertices)
-  //
-  //    println(Json.prettyPrint(json))
-  //  }
-
-  //   this test case expect options.input already exist with valid bulk load 
format.
-  //  test("bulk load and fetch vertex: mr mode") {
-  //    val serviceColumn = initTestVertexSchema(s2)
-  //
-  //    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
-  //    val input = sc.parallelize(bulkVertexLs)
-  //
-  //    HFileMRGenerator.generate(sc, s2Config, input, options)
-  //
-  //    val hfileArgs = Array(options.output, options.tableName)
-  //    val hbaseConfig = HBaseConfiguration.create()
-  //
-  //    val ret = ToolRunner.run(hbaseConfig, new 
LoadIncrementalHFiles(hbaseConfig), hfileArgs)
-  //    val s2Vertices = 
s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
-  //    val json = PostProcess.verticesToJson(s2Vertices)
-  //
-  //    println(Json.prettyPrint(json))
-  //  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
new file mode 100644
index 0000000..a21b3df
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.task
+
+import org.apache.s2graph.core.S2EdgeLike
+import org.apache.s2graph.s2jobs.BaseSparkTest
+import org.apache.spark.sql.DataFrame
+
+import scala.collection.JavaConverters._
+
+class SinkTest extends BaseSparkTest {
+  def toDataFrame(edges: Seq[String]): DataFrame = {
+    import spark.sqlContext.implicits._
+    val elements = edges.flatMap(s2.elementBuilder.toEdge(_))
+
+    elements.map { e =>
+      (e.getTs(),
+        e.getOperation(),
+        "e",
+        e.srcVertex.innerIdVal.toString,
+        e.tgtVertex.innerIdVal.toString,
+        e.label(),
+        "{}",
+        e.getDirection())
+    }.toDF("timestamp", "operation", "element", "from", "to", "label", 
"props", "direction")
+  }
+
+  test("S2graphSink writeBatch.") {
+    val label = initTestEdgeSchema(s2, tableName, schemaVersion, 
compressionAlgorithm)
+
+    val bulkEdgeString = 
"1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
+    val df = toDataFrame(Seq(bulkEdgeString))
+    val args = options.toCommand.grouped(2).map { kv =>
+      kv.head -> kv.last
+    }.toMap
+
+    val conf = TaskConf("test", "sql", Seq("input"), args)
+
+    val sink = new S2graphSink("testQuery", conf)
+    sink.write(df)
+
+    val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
+    s2Edges.foreach { edge => println(edge) }
+  }
+}

Reply via email to