add integrate test on test case.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/ab8ba2f4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/ab8ba2f4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/ab8ba2f4 Branch: refs/heads/master Commit: ab8ba2f4aa7b97770a4c036648335b2d1e065faf Parents: 62a7ef3 Author: DO YUNG YOON <[email protected]> Authored: Fri Mar 2 22:30:23 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Fri Mar 2 22:30:23 2018 +0900 ---------------------------------------------------------------------- .../loader/subscriber/TransferToHFileTest.scala | 44 +++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ab8ba2f4/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala ---------------------------------------------------------------------- diff --git a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala index 2287fe2..2eef811 100644 --- a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala +++ b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala @@ -18,13 +18,20 @@ */ package org.apache.s2graph.loader.subscriber +import java.io.PrintWriter import java.util +import com.typesafe.config.ConfigFactory +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles +import org.apache.hadoop.util.ToolRunner import org.apache.s2graph.core.{Management, PostProcess} import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} import org.apache.s2graph.core.mysqls.{Label, ServiceColumn} import org.apache.s2graph.core.storage.CanSKeyValue +import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage import org.apache.s2graph.core.types.HBaseType +import org.apache.s2graph.loader.subscriber.TransferToHFile.options import org.apache.spark.{SparkConf, SparkContext} import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} import play.api.libs.json.Json @@ -34,6 +41,7 @@ import scala.util.Try class TransferToHFileTest extends FunSuite with Matchers with BeforeAndAfterAll { import TransferToHFile._ + import scala.collection.JavaConverters._ private val master = "local[2]" private val appName = "example-spark" @@ -42,8 +50,9 @@ class TransferToHFileTest extends FunSuite with Matchers with BeforeAndAfterAll /* TransferHFile parameters */ val options = GraphFileOptions( - zkQuorum = "localhost", + input = "/tmp/test.txt", tmpPath = "/tmp/s2graph", + zkQuorum = "localhost", dbUrl = "jdbc:h2:file:./var/metastore;MODE=MYSQL", dbUser = "sa", dbPassword = "sa", @@ -78,6 +87,12 @@ class TransferToHFileTest extends FunSuite with Matchers with BeforeAndAfterAll } } + private def writeToFile(fileName: String)(lines: Seq[String]): Unit = { + val writer = new PrintWriter(fileName) + lines.foreach(line => writer.write(line + "\n")) + writer.close + } + private def initTestEdgeSchema(): Label = { import scala.collection.JavaConverters._ /* initialize model for test */ @@ -187,4 +202,31 @@ class TransferToHFileTest extends FunSuite with Matchers with BeforeAndAfterAll val kvs = TransferToHFile.generateKeyValues(sc, s2Config, input, options) TransferToHFile.generateHFile(sc, s2Config, kvs, options) } + + test("test loader script.") { + val serviceColumn = initTestVertexSchema() + + val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\ts2graph\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广ä¸ç\",\"location_often_city\":\"æ·±å³å¸\",\"location_often_days\":6,\"location_last_province\":\"广ä¸ç\",\"location_last_city\":\"æ·±å³å¸\",\"fimei_legality\":3}" + val bulkVertexLs = Seq(bulkVertexString) + writeToFile(options.input)(bulkVertexLs) + + val input = sc.parallelize(bulkVertexLs) + GraphSubscriberHelper.apply(s2Config) + val graph = GraphSubscriberHelper.g + val vertex = graph.elementBuilder.toVertex(bulkVertexString).get + + val kvs = TransferToHFile.generateKeyValues(sc, s2Config, input, options) + TransferToHFile.generateHFile(sc, s2Config, kvs, options) + + val hfileArgs = Array(options.tmpPath, options.tableName) + val hbaseConfig = HBaseConfiguration.create() + + val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) + + val vertexId = graph.elementBuilder.newVertexId("s2graph")("imei")("800188448586078") + val vertexOpt = graph.getVertex(vertexId) + + vertexOpt.isDefined shouldBe(true) + vertexOpt.get shouldBe (vertex) + } }
