implement writeBatch only for LoadIncrementHFile.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/86dcc112 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/86dcc112 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/86dcc112 Branch: refs/heads/master Commit: 86dcc112d9582435c136ca60e050be3e993e685f Parents: ae19dc1 Author: DO YUNG YOON <[email protected]> Authored: Mon Apr 2 18:58:51 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Mon Apr 2 19:02:53 2018 +0900 ---------------------------------------------------------------------- .../apache/s2graph/s2jobs/S2GraphHelper.scala | 5 +- .../s2jobs/loader/GraphFileOptions.scala | 24 +++ .../s2graph/s2jobs/loader/HFileGenerator.scala | 10 +- .../serde/reader/RowBulkFormatReader.scala | 19 ++ .../org/apache/s2graph/s2jobs/task/Sink.scala | 16 +- .../sql/streaming/S2StreamQueryWriter.scala | 48 ----- .../s2jobs/dump/GraphFileDumperTest.scala | 97 ---------- .../s2jobs/loader/GraphFileGeneratorTest.scala | 192 +++++++++---------- .../apache/s2graph/s2jobs/task/SinkTest.scala | 62 ++++++ 9 files changed, 221 insertions(+), 252 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala index 9eb9cc8..69b3716 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala @@ -85,9 +85,10 @@ object S2GraphHelper { } } - //TODO: def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = { - GraphFileOptions() + val args = taskConf.options.flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray + + GraphFileOptions.toOption(args) } def sparkSqlRowToGraphElement(s2: S2Graph, row: Row, schema: StructType, reservedColumn: Set[String]): Option[GraphElement] = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala index 3e3ffb9..4bf8379 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala @@ -91,6 +91,10 @@ object GraphFileOptions { (inner.head, inner.last) }).toMap } + + def toLabelMappingString(labelMapping: Map[String, String]): String = + labelMapping.map { case (k, v) => Seq(k, v).mkString(":") }.mkString(",") + } /** * Option case class for TransferToHFile. @@ -135,4 +139,24 @@ case class GraphFileOptions(input: String = "", "db.default.driver" -> dbDriver ) } + + def toCommand: Array[String] = + Array( + "--input", input, + "--tempDir", tempDir, + "--output", output, + "--zkQuorum", zkQuorum, + "--table", tableName, + "--dbUrl", dbUrl, + "--dbUser", dbUser, + "--dbPassword", dbPassword, + "--dbDriver", dbDriver, + "--maxHFilePerRegionServer", maxHFilePerRegionServer.toString, + "--numRegions", numRegions.toString, + "--labelMapping", GraphFileOptions.toLabelMappingString(labelMapping), + "--autoEdgeCreate", autoEdgeCreate.toString, + "--buildDegree", buildDegree.toString, + "--incrementalLoad", incrementalLoad.toString, + "--method", method + ) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala index 431631b..da190ee 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala @@ -24,10 +24,11 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.client.ConnectionFactory import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding -import org.apache.hadoop.hbase.mapreduce.TableOutputFormat +import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, TableOutputFormat} import org.apache.hadoop.hbase.regionserver.BloomType import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, TableName} +import org.apache.hadoop.util.ToolRunner import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter @@ -123,5 +124,12 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] { HFileGenerator.generateHFile(sc, config, kvs, _options) } + + def loadIncrementHFile(options: GraphFileOptions): Int = { + /* LoadIncrementHFiles */ + val hfileArgs = Array(options.output, options.tableName) + val hbaseConfig = HBaseConfiguration.create() + ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala index 73e56ce..12b2ba2 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.s2graph.s2jobs.serde.reader import org.apache.s2graph.core.{GraphElement, S2Graph} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index bc67822..7c4c857 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -20,6 +20,9 @@ package org.apache.s2graph.s2jobs.task import com.typesafe.config.Config +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles +import org.apache.hadoop.util.ToolRunner import org.apache.s2graph.core.Management import org.apache.s2graph.s2jobs.S2GraphHelper import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer} @@ -206,25 +209,26 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider" - private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", "operation", "elem", "direction") - override def write(inputDF: DataFrame): Unit = { val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism) if (inputDF.isStreaming) writeStream(df.writeStream) else { - val config: Config = Management.toConfig(conf.options) - val bulkLoadOptions: GraphFileOptions = S2GraphHelper.toGraphFileOptions(conf) + val options = S2GraphHelper.toGraphFileOptions(conf) + val config = Management.toConfig(options.toConfigParams) val input = df.rdd - val transformer = new SparkBulkLoaderTransformer(config, bulkLoadOptions) + val transformer = new SparkBulkLoaderTransformer(config, options) implicit val reader = new RowBulkFormatReader implicit val writer = new KeyValueWriter val kvs = transformer.transform(input) - HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), bulkLoadOptions) + HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options) + + // finish bulk load by execute LoadIncrementHFile. + HFileGenerator.loadIncrementHFile(options) } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala index ac37533..f6fecd7 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala @@ -86,52 +86,4 @@ private [sql] class S2StreamQueryWriter( private def rowToEdge(internalRow:InternalRow): Option[GraphElement] = S2GraphHelper.sparkSqlRowToGraphElement(s2SinkContext.getGraph, encoder.fromRow(internalRow), schema, RESERVED_COLUMN) - -// { -// val s2Graph = s2SinkContext.getGraph -// val row = encoder.fromRow(internalRow) -// -// val timestamp = row.getAs[Long]("timestamp") -// val operation = Try(row.getAs[String]("operation")).getOrElse("insert") -// val elem = Try(row.getAs[String]("elem")).getOrElse("e") -// -// val props: Map[String, Any] = Option(row.getAs[String]("props")) match { -// case Some(propsStr:String) => -// JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject]) -// case None => -// schema.fieldNames.flatMap { field => -// if (!RESERVED_COLUMN.contains(field)) { -// Seq( -// field -> getRowValAny(row, field) -// ) -// } else Nil -// }.toMap -// } -// -// elem match { -// case "e" | "edge" => -// val from = getRowValAny(row, "from") -// val to = getRowValAny(row, "to") -// val label = row.getAs[String]("label") -// val direction = Try(row.getAs[String]("direction")).getOrElse("out") -// Some( -// s2Graph.elementBuilder.toEdge(from, to, label, direction, props, timestamp, operation) -// ) -// case "v" | "vertex" => -// val id = getRowValAny(row, "id") -// val serviceName = row.getAs[String]("service") -// val columnName = row.getAs[String]("column") -// Some( -// s2Graph.elementBuilder.toVertex(serviceName, columnName, id, props, timestamp, operation) -// ) -// case _ => -// logger.warn(s"'$elem' is not GraphElement. skipped!! (${row.toString()})") -// None -// } -// } - - private def getRowValAny(row:Row, fieldName:String):Any = { - val idx = row.fieldIndex(fieldName) - row.get(idx) - } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala deleted file mode 100644 index 81566f9..0000000 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -//package org.apache.s2graph.s2jobs.dump -// -//import org.apache.s2graph.core._ -//import org.apache.s2graph.core.types.HBaseType -//import org.apache.s2graph.s2jobs.S2GraphHelper -//import org.apache.s2graph.s2jobs.loader.GraphFileOptions -//import org.apache.spark.{SparkConf, SparkContext} -//import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} -//import play.api.libs.json.Json -// -//class GraphFileDumperTest extends FunSuite with Matchers with BeforeAndAfterAll { -// private val master = "local[2]" -// private val appName = "example-spark" -// -// private var sc: SparkContext = _ -// val options = GraphFileOptions( -// input = "/tmp/imei-20.txt", -// tempDir = "/tmp/bulkload_tmp", -// output = "/tmp/s2graph_bulkload", -// zkQuorum = "localhost", -// dbUrl = "jdbc:h2:file:./var/metastore;MODE=MYSQL", -// dbUser = "sa", -// dbPassword = "sa", -// dbDriver = "org.h2.Driver", -// tableName = "s2graph", -// maxHFilePerRegionServer = 1, -// numRegions = 3, -// compressionAlgorithm = "NONE", -// buildDegree = false, -// autoEdgeCreate = false) -// -// val s2Config = Management.toConfig(options.toConfigParams) -// -// val tableName = options.tableName -// val schemaVersion = HBaseType.DEFAULT_VERSION -// val compressionAlgorithm: String = options.compressionAlgorithm -// var s2: S2Graph = _ -// -// override def beforeAll(): Unit = { -// // initialize spark context. -// val conf = new SparkConf() -// .setMaster(master) -// .setAppName(appName) -// -// sc = new SparkContext(conf) -// -// s2 = S2GraphHelper.initS2Graph(s2Config) -// } -// -// override def afterAll(): Unit = { -// if (sc != null) sc.stop() -// if (s2 != null) s2.shutdown() -// } -// -// test("test dump.") { -// implicit val graph = s2 -// val snapshotPath = "/usr/local/var/hbase" -// val restorePath = "/tmp/hbase_restore" -// val snapshotTableNames = Seq("s2graph-snapshot") -// -// val cellLs = HFileDumper.toKeyValue(sc, snapshotPath, restorePath, -// snapshotTableNames, columnFamily = "v") -// -// val kvsLs = cellLs.map(CanGraphElement.cellsToSKeyValues).collect() -// -// val elements = kvsLs.flatMap { kvs => -// CanGraphElement.sKeyValueToGraphElement(s2)(kvs) -// } -// -// elements.foreach { element => -// val v = element.asInstanceOf[S2VertexLike] -// val json = Json.prettyPrint(PostProcess.s2VertexToJson(v).get) -// -// println(json) -// } -// -// } -//} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala index c382813..991897b 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala @@ -19,7 +19,7 @@ package org.apache.s2graph.s2jobs.loader -import org.apache.s2graph.core.PostProcess +import org.apache.s2graph.core.{PostProcess, S2VertexLike} import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue} import org.apache.s2graph.s2jobs.BaseSparkTest import org.apache.s2graph.s2jobs.serde.reader.{RowBulkFormatReader, TsvBulkFormatReader} @@ -27,6 +27,8 @@ import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter import org.apache.spark.rdd.RDD import play.api.libs.json.Json +import scala.io.Source + class GraphFileGeneratorTest extends BaseSparkTest { import org.apache.hadoop.hbase.{KeyValue => HKeyValue} @@ -116,106 +118,100 @@ class GraphFileGeneratorTest extends BaseSparkTest { bulkEdge shouldBe (indexEdge) } -// test("test generateKeyValues edge only. LocalBulkLoaderTransformer") { -// val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) -// /* end of initialize model */ -// -// val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" -// -// val transformerMode = "local" -// val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString)) -// -// val serDe = s2.defaultStorage.serDe -// -// val bulkEdge = s2.elementBuilder.toGraphElement(bulkEdgeString, options.labelMapping).get -// -// val indexEdges = ls.flatMap { kv => -// serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), None) -// } -// -// val indexEdge = indexEdges.head -// -// println(indexEdge) -// println(bulkEdge) -// -// bulkEdge shouldBe (indexEdge) -// } -// -// test("test generateKeyValues vertex only. SparkBulkLoaderTransformer") { -// val serviceColumn = initTestVertexSchema(s2) -// val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广ä¸ç\",\"location_often_city\":\"æ·±å³å¸\",\"location_often_days\":6,\"location_last_province\":\"广ä¸ç\",\"location_last_city\":\"æ·±å³å¸\",\"fimei_legality\":3}" -// val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get -// -// val transformerMode = "spark" -// val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString)) -// -// val serDe = s2.defaultStorage.serDe -// -// val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get -// -// PostProcess.s2VertexToJson(vertex).foreach { jsValue => -// println(Json.prettyPrint(jsValue)) -// } -// -// bulkVertex shouldBe (vertex) -// } -// -// test("test generateKeyValues vertex only. LocalBulkLoaderTransformer") { -// val serviceColumn = initTestVertexSchema(s2) -// val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广ä¸ç\",\"location_often_city\":\"æ·±å³å¸\",\"location_often_days\":6,\"location_last_province\":\"广ä¸ç\",\"location_last_city\":\"æ·±å³å¸\",\"fimei_legality\":3}" -// val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get + test("test generateKeyValues edge only. LocalBulkLoaderTransformer") { + val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) + /* end of initialize model */ + + val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" + + val transformerMode = "local" + val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString)) + + val serDe = s2.defaultStorage.serDe + + val bulkEdge = s2.elementBuilder.toGraphElement(bulkEdgeString, options.labelMapping).get + + val indexEdges = ls.flatMap { kv => + serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), None) + } + + val indexEdge = indexEdges.head + + println(indexEdge) + println(bulkEdge) + + bulkEdge shouldBe (indexEdge) + } + + test("test generateKeyValues vertex only. SparkBulkLoaderTransformer") { + val serviceColumn = initTestVertexSchema(s2) + val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广ä¸ç\",\"location_often_city\":\"æ·±å³å¸\",\"location_often_days\":6,\"location_last_province\":\"广ä¸ç\",\"location_last_city\":\"æ·±å³å¸\",\"fimei_legality\":3}" + val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get + + val transformerMode = "spark" + val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString)) + + val serDe = s2.defaultStorage.serDe + + val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get + + PostProcess.s2VertexToJson(vertex).foreach { jsValue => + println(Json.prettyPrint(jsValue)) + } + + bulkVertex shouldBe (vertex) + } + + test("test generateKeyValues vertex only. LocalBulkLoaderTransformer") { + val serviceColumn = initTestVertexSchema(s2) + val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广ä¸ç\",\"location_often_city\":\"æ·±å³å¸\",\"location_often_days\":6,\"location_last_province\":\"广ä¸ç\",\"location_last_city\":\"æ·±å³å¸\",\"fimei_legality\":3}" + val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get + + val transformerMode = "local" + val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString)) + + val serDe = s2.defaultStorage.serDe + + val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get + + PostProcess.s2VertexToJson(vertex).foreach { jsValue => + println(Json.prettyPrint(jsValue)) + } + + bulkVertex shouldBe (vertex) + } + + // this test case expect options.input already exist with valid bulk load format. + test("bulk load and fetch vertex: spark mode") { + import scala.collection.JavaConverters._ + val serviceColumn = initTestVertexSchema(s2) + + val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq + val input = sc.parallelize(bulkVertexLs) + + HFileGenerator.generate(sc, s2Config, input, options) + HFileGenerator.loadIncrementHFile(options) + + val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) + val json = PostProcess.verticesToJson(s2Vertices) + + println(Json.prettyPrint(json)) + } + + // this test case expect options.input already exist with valid bulk load format. +// test("bulk load and fetch vertex: mr mode") { +// import scala.collection.JavaConverters._ +// val serviceColumn = initTestVertexSchema(s2) // -// val transformerMode = "local" -// val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString)) +// val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq +// val input = sc.parallelize(bulkVertexLs) // -// val serDe = s2.defaultStorage.serDe +// HFileMRGenerator.generate(sc, s2Config, input, options) +// HFileGenerator.loadIncrementHFile(options) // -// val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get +// val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) +// val json = PostProcess.verticesToJson(s2Vertices) // -// PostProcess.s2VertexToJson(vertex).foreach { jsValue => -// println(Json.prettyPrint(jsValue)) +// println(Json.prettyPrint(json)) // } -// -// bulkVertex shouldBe (vertex) -// } - - // this test case expect options.input already exist with valid bulk load format. - // test("bulk load and fetch vertex: spark mode") { - // import scala.collection.JavaConverters._ - // val serviceColumn = initTestVertexSchema(s2) - // - // val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq - // val input = sc.parallelize(bulkVertexLs) - // - // HFileGenerator.generate(sc, s2Config, input, options) - // - // val hfileArgs = Array(options.output, options.tableName) - // val hbaseConfig = HBaseConfiguration.create() - // - // val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) - // - // val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) - // val json = PostProcess.verticesToJson(s2Vertices) - // - // println(Json.prettyPrint(json)) - // } - - // this test case expect options.input already exist with valid bulk load format. - // test("bulk load and fetch vertex: mr mode") { - // val serviceColumn = initTestVertexSchema(s2) - // - // val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq - // val input = sc.parallelize(bulkVertexLs) - // - // HFileMRGenerator.generate(sc, s2Config, input, options) - // - // val hfileArgs = Array(options.output, options.tableName) - // val hbaseConfig = HBaseConfiguration.create() - // - // val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) - // val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) - // val json = PostProcess.verticesToJson(s2Vertices) - // - // println(Json.prettyPrint(json)) - // } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/86dcc112/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala new file mode 100644 index 0000000..a21b3df --- /dev/null +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.task + +import org.apache.s2graph.core.S2EdgeLike +import org.apache.s2graph.s2jobs.BaseSparkTest +import org.apache.spark.sql.DataFrame + +import scala.collection.JavaConverters._ + +class SinkTest extends BaseSparkTest { + def toDataFrame(edges: Seq[String]): DataFrame = { + import spark.sqlContext.implicits._ + val elements = edges.flatMap(s2.elementBuilder.toEdge(_)) + + elements.map { e => + (e.getTs(), + e.getOperation(), + "e", + e.srcVertex.innerIdVal.toString, + e.tgtVertex.innerIdVal.toString, + e.label(), + "{}", + e.getDirection()) + }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction") + } + + test("S2graphSink writeBatch.") { + val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) + + val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" + val df = toDataFrame(Seq(bulkEdgeString)) + val args = options.toCommand.grouped(2).map { kv => + kv.head -> kv.last + }.toMap + + val conf = TaskConf("test", "sql", Seq("input"), args) + + val sink = new S2graphSink("testQuery", conf) + sink.write(df) + + val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike]) + s2Edges.foreach { edge => println(edge) } + } +}
