remove unnecessary HConnection on HBaseContext and add generateHFile test case.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/62a7ef3e Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/62a7ef3e Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/62a7ef3e Branch: refs/heads/master Commit: 62a7ef3e3ca9aa7cefe75ae5c6924cef3b2785ce Parents: 1111275 Author: DO YUNG YOON <[email protected]> Authored: Wed Feb 28 15:37:19 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Fri Mar 2 10:04:37 2018 +0900 ---------------------------------------------------------------------- .../s2graph/loader/spark/HBaseContext.scala | 12 +-- .../loader/spark/HBaseRDDFunctions.scala | 3 +- .../loader/subscriber/TransferToHFile.scala | 106 ++++++++++++++++--- .../loader/subscriber/TransferToHFileTest.scala | 80 ++++++++------ .../hbase/AsynchbaseStorageManagement.scala | 16 +-- 5 files changed, 155 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/62a7ef3e/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala index 1f68dc2..e177196 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala @@ -591,6 +591,7 @@ class HBaseContext(@transient private val sc: SparkContext, * * @param rdd The RDD we are bulk loading from * @param tableName The HBase table we are loading into + * @param startKeys * @param flatMap A flapMap function that will make every * row in the RDD * into N cells for the bulk load @@ -603,17 +604,16 @@ class HBaseContext(@transient private val sc: SparkContext, */ def bulkLoad[T](rdd:RDD[T], tableName: TableName, + startKeys: Array[Array[Byte]], flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])], stagingDir:String, - familyHFileWriteOptionsMap: - util.Map[Array[Byte], FamilyHFileWriteOptions] = - new util.HashMap[Array[Byte], FamilyHFileWriteOptions], + familyHFileWriteOptionsMap: util.Map[Array[Byte], FamilyHFileWriteOptions] = new util.HashMap[Array[Byte], FamilyHFileWriteOptions], compactionExclude: Boolean = false, maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE): Unit = { - val conn = ConnectionFactory.createConnection(config) - val regionLocator = conn.getRegionLocator(tableName) - val startKeys = regionLocator.getStartKeys +// val conn = ConnectionFactory.createConnection(config) +// val regionLocator = conn.getRegionLocator(tableName) +// val startKeys = regionLocator.getStartKeys val defaultCompressionStr = config.get("hfile.compression", Compression.Algorithm.NONE.getName) val defaultCompression = Compression.getCompressionAlgorithmByName(defaultCompressionStr) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/62a7ef3e/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala index b818a3c..54949a8 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala @@ -192,6 +192,7 @@ object HBaseRDDFunctions */ def hbaseBulkLoad(hc: HBaseContext, tableName: TableName, + startKeys: Array[Array[Byte]], flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])], stagingDir:String, familyHFileWriteOptionsMap: @@ -199,7 +200,7 @@ object HBaseRDDFunctions new util.HashMap[Array[Byte], FamilyHFileWriteOptions](), compactionExclude: Boolean = false, maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit = { - hc.bulkLoad(rdd, tableName, + hc.bulkLoad(rdd, tableName, startKeys, flatMap, stagingDir, familyHFileWriteOptionsMap, compactionExclude, maxSize) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/62a7ef3e/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala index 0d72b9c..bfb5a96 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala @@ -22,6 +22,7 @@ package org.apache.s2graph.loader.subscriber import com.typesafe.config.Config import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase._ +import org.apache.hadoop.hbase.client.ConnectionFactory import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding import org.apache.hadoop.hbase.mapreduce.TableOutputFormat @@ -29,6 +30,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.{Label, LabelMeta} +import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId} import org.apache.s2graph.loader.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier} import org.apache.s2graph.spark.spark.SparkApp @@ -45,17 +47,19 @@ object TransferToHFile extends SparkApp { var options:GraphFileOptions = _ case class GraphFileOptions(input: String = "", - tmpPath: String = "", + tmpPath: String = s"/tmp/bulkload", zkQuorum: String = "", tableName: String = "", dbUrl: String = "", dbUser: String = "", dbPassword: String = "", maxHFilePerRegionServer: Int = 1, + numRegions: Int = 3, labelMapping: Map[String, String] = Map.empty[String, String], autoEdgeCreate: Boolean = false, buildDegree: Boolean = false, - compressionAlgorithm: String = "") { + incrementalLoad: Boolean = false, + compressionAlgorithm: String = "NONE") { def toConfigParams = { Map( "hbase.zookeeper.quorum" -> zkQuorum, @@ -93,6 +97,10 @@ object TransferToHFile extends SparkApp { c.copy(maxHFilePerRegionServer = x)).text("maximum number of HFile per RegionServer." ) + opt[Int]('n', "numRegions").action ( (x, c) => + c.copy(numRegions = x)).text("total numRegions(pre-split size) on table." + ) + opt[String]('l', "labelMapping").action( (x, c) => c.copy(labelMapping = toLabelMapping(x)) ).text("mapping info to change the label from source (originalLabel:newLabel)") @@ -101,6 +109,10 @@ object TransferToHFile extends SparkApp { opt[Boolean]('a', "autoEdgeCreate").action( (x, c) => c.copy(autoEdgeCreate = x)).text("generate reverse edge automatically") + + opt[Boolean]('c', "incrementalLoad").action( (x, c) => + c.copy(incrementalLoad = x)).text("whether incremental bulkload which append data on existing table or not." + ) } //TODO: Process AtomicIncrementRequest too. @@ -233,6 +245,44 @@ object TransferToHFile extends SparkApp { } } + def generateHFile(sc: SparkContext, + s2Config: Config, + kvs: RDD[KeyValue], + options: GraphFileOptions): Unit = { + val hbaseConfig = toHBaseConfig(options) + val startKeys = + if (options.incrementalLoad) { + // need hbase connection to existing table to figure out the ranges of regions. + getTableStartKeys(hbaseConfig, TableName.valueOf(options.tableName)) + } else { + // otherwise we do not need to initialize Connection to hbase cluster. + // only numRegions determine region's pre-split. + getStartKeys(numRegions = options.numRegions) + } + + val hbaseSc = new HBaseContext(sc, hbaseConfig) + + def flatMap(kv: KeyValue): Iterator[(KeyFamilyQualifier, Array[Byte])] = { + val k = new KeyFamilyQualifier(CellUtil.cloneRow(kv), CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)) + val v = CellUtil.cloneValue(kv) + Seq((k -> v)).toIterator + } + + val compressionAlgorithmClass = Algorithm.valueOf(options.compressionAlgorithm).getName.toUpperCase + val familyOptions = new FamilyHFileWriteOptions(compressionAlgorithmClass, + BloomType.ROW.name().toUpperCase, 32768, DataBlockEncoding.FAST_DIFF.name().toUpperCase) + val familyOptionsMap = Map("e".getBytes("UTF-8") -> familyOptions, "v".getBytes("UTF-8") -> familyOptions) + + + hbaseSc.bulkLoad(kvs, TableName.valueOf(options.tableName), startKeys, flatMap, options.tmpPath, familyOptionsMap) + } + + def getTableStartKeys(hbaseConfig: Configuration, tableName: TableName): Array[Array[Byte]] = { + val conn = ConnectionFactory.createConnection(hbaseConfig) + val regionLocator = conn.getRegionLocator(tableName) + regionLocator.getStartKeys + } + def toHBaseConfig(graphFileOptions: GraphFileOptions): Configuration = { val hbaseConf = HBaseConfiguration.create() @@ -243,6 +293,27 @@ object TransferToHFile extends SparkApp { hbaseConf } + def getStartKeys(numRegions: Int): Array[Array[Byte]] = { + val startKey = AsynchbaseStorageManagement.getStartKey(numRegions) + val endKey = AsynchbaseStorageManagement.getEndKey(numRegions) + if (numRegions < 3) { + throw new IllegalArgumentException("Must create at least three regions") + } else if (Bytes.compareTo(startKey, endKey) >= 0) { + throw new IllegalArgumentException("Start key must be smaller than end key") + } + val empty = new Array[Byte](0) + + if (numRegions == 3) { + Array(empty, startKey, endKey) + } else { + val splitKeys: Array[Array[Byte]] = Bytes.split(startKey, endKey, numRegions - 3) + if (splitKeys == null || splitKeys.length != numRegions - 1) { + throw new IllegalArgumentException("Unable to split key range into enough regions") + } + Array(empty) ++ splitKeys.toSeq + } + } + override def run() = { parser.parse(args, GraphFileOptions()) match { case Some(o) => options = o @@ -265,21 +336,22 @@ object TransferToHFile extends SparkApp { GraphSubscriberHelper.apply(s2Config) val merged = TransferToHFile.generateKeyValues(sc, s2Config, rdd, options) - - /* set up hbase init */ - val hbaseSc = new HBaseContext(sc, toHBaseConfig(options)) - - def flatMap(kv: KeyValue): Iterator[(KeyFamilyQualifier, Array[Byte])] = { - val k = new KeyFamilyQualifier(CellUtil.cloneRow(kv), CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)) - val v = CellUtil.cloneValue(kv) - Seq((k -> v)).toIterator - } - - val familyOptions = new FamilyHFileWriteOptions(Algorithm.LZ4.getName.toUpperCase, - BloomType.ROW.name().toUpperCase, 32768, DataBlockEncoding.FAST_DIFF.name().toUpperCase) - val familyOptionsMap = Map("e".getBytes("UTF-8") -> familyOptions, "v".getBytes("UTF-8") -> familyOptions) - - hbaseSc.bulkLoad(merged, TableName.valueOf(options.tableName), flatMap, options.tmpPath, familyOptionsMap) + generateHFile(sc, s2Config, merged, options) +// /* set up hbase init */ +// val hbaseSc = new HBaseContext(sc, toHBaseConfig(options)) +// +// def flatMap(kv: KeyValue): Iterator[(KeyFamilyQualifier, Array[Byte])] = { +// val k = new KeyFamilyQualifier(CellUtil.cloneRow(kv), CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)) +// val v = CellUtil.cloneValue(kv) +// Seq((k -> v)).toIterator +// } +// +// val familyOptions = new FamilyHFileWriteOptions(Algorithm.LZ4.getName.toUpperCase, +// BloomType.ROW.name().toUpperCase, 32768, DataBlockEncoding.FAST_DIFF.name().toUpperCase) +// val familyOptionsMap = Map("e".getBytes("UTF-8") -> familyOptions, "v".getBytes("UTF-8") -> familyOptions) +// +// val startKeys = getStartKeys(numRegions = options.numRegions) +// hbaseSc.bulkLoad(merged, TableName.valueOf(options.tableName), startKeys, flatMap, options.tmpPath, familyOptionsMap) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/62a7ef3e/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala ---------------------------------------------------------------------- diff --git a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala index 77c147e..2287fe2 100644 --- a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala +++ b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala @@ -22,7 +22,7 @@ import java.util import org.apache.s2graph.core.{Management, PostProcess} import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} -import org.apache.s2graph.core.mysqls.Label +import org.apache.s2graph.core.mysqls.{Label, ServiceColumn} import org.apache.s2graph.core.storage.CanSKeyValue import org.apache.s2graph.core.types.HBaseType import org.apache.spark.{SparkConf, SparkContext} @@ -43,12 +43,14 @@ class TransferToHFileTest extends FunSuite with Matchers with BeforeAndAfterAll /* TransferHFile parameters */ val options = GraphFileOptions( zkQuorum = "localhost", + tmpPath = "/tmp/s2graph", dbUrl = "jdbc:h2:file:./var/metastore;MODE=MYSQL", dbUser = "sa", dbPassword = "sa", tableName = "s2graph", maxHFilePerRegionServer = 1, - compressionAlgorithm = "gz", + numRegions = 3, + compressionAlgorithm = "NONE", buildDegree = false, autoEdgeCreate = false) @@ -76,11 +78,8 @@ class TransferToHFileTest extends FunSuite with Matchers with BeforeAndAfterAll } } - - test("test edge only.") { + private def initTestEdgeSchema(): Label = { import scala.collection.JavaConverters._ - import org.apache.s2graph.core.storage.CanSKeyValue._ - /* initialize model for test */ val management = GraphSubscriberHelper.management @@ -96,7 +95,39 @@ class TransferToHFileTest extends FunSuite with Matchers with BeforeAndAfterAll hTableTTL = -1, schemaVersion = schemaVersion, compressionAlgorithm = compressionAlgorithm, options = "") } - val label = Label.findByName("friends").getOrElse(throw new IllegalArgumentException("friends label is not initialized.")) + Label.findByName("friends").getOrElse(throw new IllegalArgumentException("friends label is not initialized.")) + } + + private def initTestVertexSchema(): ServiceColumn = { + import scala.collection.JavaConverters._ + /* initialize model for test */ + val management = GraphSubscriberHelper.management + + val service = management.createService(serviceName = "s2graph", cluster = "localhost", + hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, compressionAlgorithm = "gz") + + management.createServiceColumn(service.serviceName, "imei", "string", + Seq( + Prop(name = "first_time", defaultValue = "''", dataType = "string"), + Prop(name = "last_time", defaultValue = "''", dataType = "string"), + Prop(name = "total_active_days", defaultValue = "-1", dataType = "integer"), + Prop(name = "query_amount", defaultValue = "-1", dataType = "integer"), + Prop(name = "active_months", defaultValue = "-1", dataType = "integer"), + Prop(name = "fua", defaultValue = "''", dataType = "string"), + Prop(name = "location_often_province", defaultValue = "''", dataType = "string"), + Prop(name = "location_often_city", defaultValue = "''", dataType = "string"), + Prop(name = "location_often_days", defaultValue = "-1", dataType = "integer"), + Prop(name = "location_last_province", defaultValue = "''", dataType = "string"), + Prop(name = "location_last_city", defaultValue = "''", dataType = "string"), + Prop(name = "fimei_legality", defaultValue = "-1", dataType = "integer") + ).asJava) + } + + test("test generateKeyValues edge only.") { + import scala.collection.JavaConverters._ + import org.apache.s2graph.core.storage.CanSKeyValue._ + + val label = initTestEdgeSchema() /* end of initialize model */ val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" @@ -122,30 +153,9 @@ class TransferToHFileTest extends FunSuite with Matchers with BeforeAndAfterAll bulkEdge shouldBe(indexEdge) } - test("test vertex only.") { - import scala.collection.JavaConverters._ - /* initialize model for test */ - val management = GraphSubscriberHelper.management - - val service = management.createService(serviceName = "s2graph", cluster = "localhost", - hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, compressionAlgorithm = "gz") - - val serviceColumn = management.createServiceColumn(service.serviceName, "imei", "string", - Seq( - Prop(name = "first_time", defaultValue = "''", dataType = "string"), - Prop(name = "last_time", defaultValue = "''", dataType = "string"), - Prop(name = "total_active_days", defaultValue = "-1", dataType = "integer"), - Prop(name = "query_amount", defaultValue = "-1", dataType = "integer"), - Prop(name = "active_months", defaultValue = "-1", dataType = "integer"), - Prop(name = "fua", defaultValue = "''", dataType = "string"), - Prop(name = "location_often_province", defaultValue = "''", dataType = "string"), - Prop(name = "location_often_city", defaultValue = "''", dataType = "string"), - Prop(name = "location_often_days", defaultValue = "-1", dataType = "integer"), - Prop(name = "location_last_province", defaultValue = "''", dataType = "string"), - Prop(name = "location_last_city", defaultValue = "''", dataType = "string"), - Prop(name = "fimei_legality", defaultValue = "-1", dataType = "integer") - ).asJava) + test("test generateKeyValues vertex only.") { + val serviceColumn = initTestVertexSchema() val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\ts2graph\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广ä¸ç\",\"location_often_city\":\"æ·±å³å¸\",\"location_often_days\":6,\"location_last_province\":\"广ä¸ç\",\"location_last_city\":\"æ·±å³å¸\",\"fimei_legality\":3}" val bulkVertex = GraphSubscriberHelper.g.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get @@ -167,4 +177,14 @@ class TransferToHFileTest extends FunSuite with Matchers with BeforeAndAfterAll bulkVertex shouldBe(vertex) } + + test("test generateHFile vertex only.") { + val serviceColumn = initTestVertexSchema() + + val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\ts2graph\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广ä¸ç\",\"location_often_city\":\"æ·±å³å¸\",\"location_often_days\":6,\"location_last_province\":\"广ä¸ç\",\"location_last_city\":\"æ·±å³å¸\",\"fimei_legality\":3}" + val input = sc.parallelize(Seq(bulkVertexString)) + + val kvs = TransferToHFile.generateKeyValues(sc, s2Config, input, options) + TransferToHFile.generateHFile(sc, s2Config, kvs, options) + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/62a7ef3e/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala index 0fb3173..8475ba6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala @@ -56,6 +56,14 @@ object AsynchbaseStorageManagement { val DefaultCreateTableOptions = Map( "hbase.zookeeper.quorum" -> "localhost" ) + + def getStartKey(regionCount: Int): Array[Byte] = { + Bytes.toBytes((Int.MaxValue / regionCount)) + } + + def getEndKey(regionCount: Int): Array[Byte] = { + Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1))) + } } class AsynchbaseStorageManagement(val config: Config, val clients: Seq[HBaseClient]) extends StorageManagement { @@ -271,12 +279,4 @@ class AsynchbaseStorageManagement(val config: Config, val clients: Seq[HBaseClie conn.getAdmin } } - - private def getStartKey(regionCount: Int): Array[Byte] = { - Bytes.toBytes((Int.MaxValue / regionCount)) - } - - private def getEndKey(regionCount: Int): Array[Byte] = { - Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1))) - } }
