- add s2jobs subproject. - migrate bulk loader with spark 2.3.0. - add test cases for bulk loader.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/db7f0191 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/db7f0191 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/db7f0191 Branch: refs/heads/master Commit: db7f01914073b8f56f64c243d797ee538c1ffc51 Parents: aeaff3f Author: DO YUNG YOON <[email protected]> Authored: Tue Mar 6 15:01:50 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Tue Mar 6 16:02:41 2018 +0900 ---------------------------------------------------------------------- CHANGES | 1 + build.sbt | 11 +- loader/build.sbt | 5 +- loader/loader.py | 85 +- .../s2graph/loader/spark/HBaseContext.scala | 12 +- .../loader/spark/HBaseRDDFunctions.scala | 3 +- .../loader/subscriber/GraphSubscriber.scala | 10 + .../loader/subscriber/TransferToHFile.scala | 278 ++++-- .../loader/subscriber/TransferToHFileTest.scala | 232 +++++ project/Common.scala | 1 + .../s2graph/core/GraphElementBuilder.scala | 2 +- .../org/apache/s2graph/core/JSONParser.scala | 3 +- .../s2graph/core/mysqls/ServiceColumn.scala | 4 +- .../apache/s2graph/core/storage/SKeyValue.scala | 4 + .../s2graph/core/storage/StorageSerDe.scala | 2 +- .../hbase/AsynchbaseStorageManagement.scala | 16 +- s2jobs/build.sbt | 57 ++ s2jobs/loader.py | 153 ++++ .../hbase/mapreduce/GraphHFileOutputFormat.java | 169 ++++ .../apache/s2graph/s2jobs/S2GraphHelper.scala | 31 + .../s2jobs/loader/GraphFileGenerator.scala | 42 + .../s2jobs/loader/GraphFileOptions.scala | 138 +++ .../s2graph/s2jobs/loader/HFileGenerator.scala | 222 +++++ .../s2jobs/loader/HFileMRGenerator.scala | 161 ++++ .../s2jobs/loader/RawFileGenerator.scala | 51 ++ .../s2jobs/spark/BulkLoadPartitioner.scala | 56 ++ .../s2jobs/spark/FamilyHFileWriteOptions.scala | 35 + .../s2graph/s2jobs/spark/HBaseContext.scala | 850 +++++++++++++++++++ .../s2jobs/spark/HBaseDStreamFunctions.scala | 158 ++++ .../s2jobs/spark/HBaseRDDFunctions.scala | 206 +++++ .../s2graph/s2jobs/spark/JavaHBaseContext.scala | 342 ++++++++ .../s2jobs/spark/KeyFamilyQualifier.scala | 46 + .../s2graph/s2jobs/S2GraphHelperTest.scala | 24 + .../s2jobs/loader/GraphFileGeneratorTest.scala | 256 ++++++ s2rest_play/conf/application.conf | 2 +- 35 files changed, 3546 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index b54aaeb..1f72a38 100644 --- a/CHANGES +++ b/CHANGES @@ -39,6 +39,7 @@ Release Notes - S2Graph - Version 0.2.0 * [S2GRAPH-168] - Fix args order mismatch when use addServiceColumnProp * [S2GRAPH-176] - Fix compile error on LabelMeta * [S2GRAPH-179] - Add defaultValue on ColumnMeta + * [S2GRAPH-178] - Fix null pointer error on BulkLoader ** Improvement * [S2GRAPH-72] - Support Apache TinkerPop and Gremlin http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/build.sbt ---------------------------------------------------------------------- diff --git a/build.sbt b/build.sbt index cdabfbf..af6fbd7 100755 --- a/build.sbt +++ b/build.sbt @@ -56,8 +56,8 @@ lazy val s2core = project.settings(commonSettings: _*) lazy val spark = project.settings(commonSettings: _*) -lazy val loader = project.dependsOn(s2core, spark) - .settings(commonSettings: _*) +//lazy val loader = project.dependsOn(s2core, spark) +// .settings(commonSettings: _*) lazy val s2counter_core = project.dependsOn(s2core) .settings(commonSettings: _*) @@ -65,12 +65,15 @@ lazy val s2counter_core = project.dependsOn(s2core) lazy val s2counter_loader = project.dependsOn(s2counter_core, spark) .settings(commonSettings: _*) +lazy val s2jobs = project.dependsOn(s2core) + .settings(commonSettings: _*) + lazy val s2graph_gremlin = project.dependsOn(s2core) .settings(commonSettings: _*) lazy val root = (project in file(".")) .aggregate(s2core, s2rest_play) - .dependsOn(s2rest_play, s2rest_netty, loader, s2counter_loader, s2graphql) // this enables packaging on the root project + .dependsOn(s2rest_play, s2rest_netty, s2jobs, s2counter_loader, s2graphql) // this enables packaging on the root project .settings(commonSettings: _*) lazy val runRatTask = inputKey[Unit]("Runs Apache rat on S2Graph") @@ -88,7 +91,7 @@ publishSigned := { (publishSigned in s2rest_netty).value (publishSigned in s2core).value (publishSigned in spark).value - (publishSigned in loader).value + (publishSigned in s2jobs).value (publishSigned in s2counter_core).value (publishSigned in s2counter_loader).value } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/loader/build.sbt ---------------------------------------------------------------------- diff --git a/loader/build.sbt b/loader/build.sbt index ac7d948..a93713a 100644 --- a/loader/build.sbt +++ b/loader/build.sbt @@ -29,14 +29,15 @@ projectDependencies := Seq( libraryDependencies ++= Seq( "com.google.guava" % "guava" % "12.0.1" force(), // use this old version of guava to avoid incompatibility - "org.apache.spark" %% "spark-core" % sparkVersion % "provided", + "org.apache.spark" %% "spark-core" % sparkVersion % "provided" exclude("javax.servlet", "*"), "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", "org.apache.spark" %% "spark-hive" % sparkVersion % "provided", "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion, "org.apache.httpcomponents" % "fluent-hc" % "4.2.5", "org.specs2" %% "specs2-core" % specs2Version % "test", "org.scalatest" %% "scalatest" % "2.2.1" % "test", - "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion + "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion, + "com.github.scopt" %% "scopt" % "3.7.0" ) crossScalaVersions := Seq("2.10.6") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/loader/loader.py ---------------------------------------------------------------------- diff --git a/loader/loader.py b/loader/loader.py index 1d4dc32..634f2be 100644 --- a/loader/loader.py +++ b/loader/loader.py @@ -16,50 +16,76 @@ # specific language governing permissions and limitations # under the License. -import os, sys, urllib2, urllib +import os, sys +#, urllib2, urllib def cleanup(args): cmd = "hadoop fs -rm -r /tmp/%s" % args["htable_name"] - print cmd + print(cmd) ret = os.system(cmd) - print cmd, "return", ret + print(cmd, "return", ret) return ret def hfile(args): - cmd = """spark-submit --class "subscriber.TransferToHFile" \ + cmd = """spark-submit --class "org.apache.s2graph.loader.subscriber.TransferToHFile" \ --name "TransferToHFile@shon" \ --conf "spark.task.maxFailures=20" \ --master yarn-cluster \ ---num-executors %s --driver-memory 1g --executor-memory 2g --executor-cores 1 %s \ -%s /tmp/%s %s %s %s %s %s %s""" % (args["num_executors"], JAR, args["input"], args["htable_name"], args["hbase_zk"], args["htable_name"], args["db_url"], args["max_file_per_region"], args["label_mapping"], args["auto_create_edge"]) - print cmd +--num-executors %s \ +--driver-memory 1g \ +--executor-memory 2g \ +--executor-cores 1 \ +%s \ +--input %s \ +--tmpPath /tmp/%s \ +--zkQuorum %s \ +--table %s \ +--dbUrl %s \ +--dbUser %s \ +--dbPassword %s \ +--maxHFilePerRegionServer %s \ +--labelMapping %s \ +--autoEdgeCreate %s""" % (args["num_executors"], + JAR, + args["input"], + args["htable_name"], + args["hbase_zk"], + args["htable_name"], + args["db_url"], + args["db_user"], + args["db_password"], + args["max_file_per_region"], + args["label_mapping"], + args["auto_create_edge"]) + print(cmd) ret = os.system(cmd) - print cmd, "return", ret + print(cmd, "return", ret) return ret def distcp(args): cmd = "hadoop distcp -overwrite -m %s -bandwidth %s /tmp/%s %s/tmp/%s" % (args["-m"], args["-bandwidth"], args["htable_name"], args["hbase_namenode"], args["htable_name"]) - print cmd + print(cmd) ret = os.system(cmd) - print cmd, "return", ret + print(cmd, "return", ret) return ret def chmod(args): cmd = "export HADOOP_CONF_DIR=%s; export HADOOP_USER_NAME=hdfs; hadoop fs -chmod -R 777 /tmp/%s" % (args["HADOOP_CONF_DIR"], args["htable_name"]) - print cmd + print(cmd) ret = os.system(cmd) - print cmd, "return", ret + print(cmd, "return", ret) return ret def load(args): - cmd = "export HADOOP_CONF_DIR=%s; export HBASE_CONF_DIR=%s; hbase %s /tmp/%s %s" % (args["HADOOP_CONF_DIR"], args["HBASE_CONF_DIR"], LOADER_CLASS, args["htable_name"], args["htable_name"]) - print cmd + cmd = "export HADOOP_CONF_DIR=%s; export HBASE_CONF_DIR=%s; hbase %s /tmp/%s %s" % \ + (args["HADOOP_CONF_DIR"], args["HBASE_CONF_DIR"], LOADER_CLASS, args["htable_name"], args["htable_name"]) + print(cmd) ret = os.system(cmd) - print cmd, "return", ret + print(cmd, "return", ret) return ret def send(msg): - print msg + print(msg) def run(args): cleanup(args) @@ -69,15 +95,15 @@ def run(args): if ret != 0: return send("[Failed]: loader build hfile failed %s" % ret) else: send("[Success]: loader build hfile") - ret = distcp(args) - - if ret != 0: return send("[Failed]: loader distcp failed %s" % ret) - else: send("[Success]: loader distcp") - - ret = chmod(args) - - if ret != 0: return send("[Failed]: loader chmod failed %s" % ret) - else: send("[Success]: loader chmod") + # ret = distcp(args) + # + # if ret != 0: return send("[Failed]: loader distcp failed %s" % ret) + # else: send("[Success]: loader distcp") + # + # ret = chmod(args) + # + # if ret != 0: return send("[Failed]: loader chmod failed %s" % ret) + # else: send("[Success]: loader chmod") ret = load(args) @@ -86,23 +112,24 @@ def run(args): LOADER_CLASS = "org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles" -JAR="loader/target/scala-2.10/s2loader-assembly-0.11.0-SNAPSHOT.jar" - +JAR="loader/target/scala-2.11/s2loader-assembly-0.2.1-SNAPSHOT.jar" args = { "HADOOP_CONF_DIR": "hdfs_conf_gasan", "HBASE_CONF_DIR": "hbase_conf_gasan", "htable_name": "test", -"hbase_namenode": "hdfs://nameservice:8020", +"hbase_namenode": "hdfs://localhost:8020", "hbase_zk": "localhost", "db_url": "jdbc:mysql://localhost:3306/graph_dev", +"db_user": "sa", +"db_password": "sa", "max_file_per_region": 1, "label_mapping": "none", "auto_create_edge": "false", "-m": 1, "-bandwidth": 10, "num_executors": 2, -"input": "/user/test.txt" +"input": "/tmp/test.txt" } run(args) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala index 1f68dc2..e177196 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala @@ -591,6 +591,7 @@ class HBaseContext(@transient private val sc: SparkContext, * * @param rdd The RDD we are bulk loading from * @param tableName The HBase table we are loading into + * @param startKeys * @param flatMap A flapMap function that will make every * row in the RDD * into N cells for the bulk load @@ -603,17 +604,16 @@ class HBaseContext(@transient private val sc: SparkContext, */ def bulkLoad[T](rdd:RDD[T], tableName: TableName, + startKeys: Array[Array[Byte]], flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])], stagingDir:String, - familyHFileWriteOptionsMap: - util.Map[Array[Byte], FamilyHFileWriteOptions] = - new util.HashMap[Array[Byte], FamilyHFileWriteOptions], + familyHFileWriteOptionsMap: util.Map[Array[Byte], FamilyHFileWriteOptions] = new util.HashMap[Array[Byte], FamilyHFileWriteOptions], compactionExclude: Boolean = false, maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE): Unit = { - val conn = ConnectionFactory.createConnection(config) - val regionLocator = conn.getRegionLocator(tableName) - val startKeys = regionLocator.getStartKeys +// val conn = ConnectionFactory.createConnection(config) +// val regionLocator = conn.getRegionLocator(tableName) +// val startKeys = regionLocator.getStartKeys val defaultCompressionStr = config.get("hfile.compression", Compression.Algorithm.NONE.getName) val defaultCompression = Compression.getCompressionAlgorithmByName(defaultCompressionStr) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala index b818a3c..54949a8 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala @@ -192,6 +192,7 @@ object HBaseRDDFunctions */ def hbaseBulkLoad(hc: HBaseContext, tableName: TableName, + startKeys: Array[Array[Byte]], flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])], stagingDir:String, familyHFileWriteOptionsMap: @@ -199,7 +200,7 @@ object HBaseRDDFunctions new util.HashMap[Array[Byte], FamilyHFileWriteOptions](), compactionExclude: Boolean = false, maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit = { - hc.bulkLoad(rdd, tableName, + hc.bulkLoad(rdd, tableName, startKeys, flatMap, stagingDir, familyHFileWriteOptionsMap, compactionExclude, maxSize) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala index 6ecb070..90e8bbb 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala @@ -76,6 +76,16 @@ object GraphSubscriberHelper extends WithKafka { } } + def apply(_config: Config): Unit = { + config = _config + if (g == null) { + val ec = ExecutionContext.Implicits.global + g = new S2Graph(config)(ec) + management = new Management(g) + builder = g.elementBuilder + } + } + def apply(phase: String, dbUrl: String, zkQuorum: String, kafkaBrokerList: String): Unit = { config = GraphConfig(phase, toOption(dbUrl), toOption(zkQuorum), toOption(kafkaBrokerList)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala index 6aaf6fd..bfb5a96 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala @@ -19,44 +19,115 @@ package org.apache.s2graph.loader.subscriber -import org.apache.hadoop.hbase.client.Put +import com.typesafe.config.Config +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase._ +import org.apache.hadoop.hbase.client.ConnectionFactory import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding -import org.apache.hadoop.hbase.mapreduce.{TableOutputFormat} -import org.apache.hadoop.hbase._ +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.regionserver.BloomType import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.{Label, LabelMeta} -import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId, LabelWithDirection} -import org.apache.s2graph.loader.spark.{KeyFamilyQualifier, HBaseContext, FamilyHFileWriteOptions} +import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement +import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId} +import org.apache.s2graph.loader.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier} import org.apache.s2graph.spark.spark.SparkApp -import org.apache.spark.{SparkContext} +import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -import org.hbase.async.{PutRequest} +import org.hbase.async.PutRequest import play.api.libs.json.Json + import scala.collection.JavaConversions._ object TransferToHFile extends SparkApp { - val usages = - s""" - |create HFiles for hbase table on zkQuorum specified. - |note that hbase table is created already and pre-splitted properly. - | - |params: - |0. input: hdfs path for tsv file(bulk format). - |1. output: hdfs path for storing HFiles. - |2. zkQuorum: running hbase cluster zkQuorum. - |3. tableName: table name for this bulk upload. - |4. dbUrl: db url for parsing to graph element. - """.stripMargin + var options:GraphFileOptions = _ + + case class GraphFileOptions(input: String = "", + tmpPath: String = s"/tmp/bulkload", + zkQuorum: String = "", + tableName: String = "", + dbUrl: String = "", + dbUser: String = "", + dbPassword: String = "", + maxHFilePerRegionServer: Int = 1, + numRegions: Int = 3, + labelMapping: Map[String, String] = Map.empty[String, String], + autoEdgeCreate: Boolean = false, + buildDegree: Boolean = false, + incrementalLoad: Boolean = false, + compressionAlgorithm: String = "NONE") { + def toConfigParams = { + Map( + "hbase.zookeeper.quorum" -> zkQuorum, + "db.default.url" -> dbUrl, + "db.default.user" -> dbUser, + "db.default.password" -> dbPassword + ) + } + } + + val parser = new scopt.OptionParser[GraphFileOptions]("run") { + + opt[String]('i', "input").required().action( (x, c) => + c.copy(input = x) ).text("hdfs path for tsv file(bulk format)") + + opt[String]('m', "tmpPath").required().action( (x, c) => + c.copy(tmpPath = x) ).text("temp hdfs path for storing HFiles") + + opt[String]('z', "zkQuorum").required().action( (x, c) => + c.copy(zkQuorum = x) ).text("zookeeper config for hbase") + + opt[String]('t', "table").required().action( (x, c) => + c.copy(tableName = x) ).text("table name for this bulk upload.") + + opt[String]('c', "dbUrl").required().action( (x, c) => + c.copy(dbUrl = x)).text("jdbc connection url.") + + opt[String]('u', "dbUser").required().action( (x, c) => + c.copy(dbUser = x)).text("database user name.") + + opt[String]('p', "dbPassword").required().action( (x, c) => + c.copy(dbPassword = x)).text("database password.") + + opt[Int]('h', "maxHFilePerRegionServer").action ( (x, c) => + c.copy(maxHFilePerRegionServer = x)).text("maximum number of HFile per RegionServer." + ) + + opt[Int]('n', "numRegions").action ( (x, c) => + c.copy(numRegions = x)).text("total numRegions(pre-split size) on table." + ) + + opt[String]('l', "labelMapping").action( (x, c) => + c.copy(labelMapping = toLabelMapping(x)) ).text("mapping info to change the label from source (originalLabel:newLabel)") + + opt[Boolean]('d', "buildDegree").action( (x, c) => + c.copy(buildDegree = x)).text("generate degree values") + + opt[Boolean]('a', "autoEdgeCreate").action( (x, c) => + c.copy(autoEdgeCreate = x)).text("generate reverse edge automatically") + + opt[Boolean]('c', "incrementalLoad").action( (x, c) => + c.copy(incrementalLoad = x)).text("whether incremental bulkload which append data on existing table or not." + ) + } //TODO: Process AtomicIncrementRequest too. /** build key values */ case class DegreeKey(vertexIdStr: String, labelName: String, direction: String) + private def toLabelMapping(lableMapping: String): Map[String, String] = { + (for { + token <- lableMapping.split(",") + inner = token.split(":") if inner.length == 2 + } yield { + (inner.head, inner.last) + }).toMap + } + private def insertBulkForLoaderAsync(edge: S2Edge, createRelEdges: Boolean = true): List[PutRequest] = { val relEdges = if (createRelEdges) edge.relatedEdges else List(edge) buildPutRequests(edge.toSnapshotEdge) ++ relEdges.toList.flatMap { e => @@ -83,14 +154,17 @@ object TransferToHFile extends SparkApp { output <- List(degreeKey -> 1L) ++ extra } yield output } + def buildPutRequests(snapshotEdge: SnapshotEdge): List[PutRequest] = { val kvs = GraphSubscriberHelper.g.getStorage(snapshotEdge.label).serDe.snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) } } + def buildPutRequests(indexEdge: IndexEdge): List[PutRequest] = { val kvs = GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.toList kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) } } + def buildDegreePutRequests(vertexId: String, labelName: String, direction: String, degreeVal: Long): List[PutRequest] = { val label = Label.findByName(labelName).getOrElse(throw new RuntimeException(s"$labelName is not found in DB.")) val dir = GraphUtil.directions(direction) @@ -101,7 +175,7 @@ object TransferToHFile extends SparkApp { val ts = System.currentTimeMillis() val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion)) - val edge = GraphSubscriberHelper.builder.newEdge(vertex, vertex, label, dir, propsWithTs=propsWithTs) + val edge = GraphSubscriberHelper.builder.newEdge(vertex, vertex, label, dir, propsWithTs = propsWithTs) edge.edgesWithIndex.flatMap { indexEdge => GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.map { kv => @@ -115,13 +189,13 @@ object TransferToHFile extends SparkApp { (key, value) <- degreeKeyVals putRequest <- buildDegreePutRequests(key.vertexIdStr, key.labelName, key.direction, value) } yield { - val p = putRequest - val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) - kv - } + val p = putRequest + val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) + kv + } kvs.toIterator } - + def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], autoEdgeCreate: Boolean): Iterator[KeyValue] = { val kvList = new java.util.ArrayList[KeyValue] for (s <- strs) { @@ -143,71 +217,141 @@ object TransferToHFile extends SparkApp { val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) kvList.add(kv) } - } + } } } kvList.iterator() } - - - override def run() = { - val input = args(0) - val tmpPath = args(1) - val zkQuorum = args(2) - val tableName = args(3) - val dbUrl = args(4) - val maxHFilePerResionServer = args(5).toInt - val labelMapping = if (args.length >= 7) GraphSubscriberHelper.toLabelMapping(args(6)) else Map.empty[String, String] - val autoEdgeCreate = if (args.length >= 8) args(7).toBoolean else false - val buildDegree = if (args.length >= 9) args(8).toBoolean else true - val compressionAlgorithm = if (args.length >= 10) args(9) else "lz4" - val conf = sparkConf(s"$input: TransferToHFile") - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - conf.set("spark.kryoserializer.buffer.mb", "24") + def generateKeyValues(sc: SparkContext, + s2Config: Config, + input: RDD[String], + graphFileOptions: GraphFileOptions): RDD[KeyValue] = { + val kvs = input.mapPartitions { iter => + GraphSubscriberHelper.apply(s2Config) - val sc = new SparkContext(conf) - - GraphSubscriberHelper.management.createStorageTable(zkQuorum, tableName, List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm) - - /* set up hbase init */ - val hbaseConf = HBaseConfiguration.create() - hbaseConf.set("hbase.zookeeper.quorum", zkQuorum) - hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName) - hbaseConf.set("hadoop.tmp.dir", s"/tmp/$tableName") - - - val rdd = sc.textFile(input) - - - val kvs = rdd.mapPartitions { iter => - val phase = System.getProperty("phase") - GraphSubscriberHelper.apply(phase, dbUrl, "none", "none") - toKeyValues(iter.toSeq, labelMapping, autoEdgeCreate) + toKeyValues(iter.toSeq, graphFileOptions.labelMapping, graphFileOptions.autoEdgeCreate) } - val merged = if (!buildDegree) kvs + if (!graphFileOptions.buildDegree) kvs else { - kvs ++ buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { (agg, current) => + kvs ++ buildDegrees(input, graphFileOptions.labelMapping, graphFileOptions.autoEdgeCreate).reduceByKey { (agg, current) => agg + current }.mapPartitions { iter => - val phase = System.getProperty("phase") - GraphSubscriberHelper.apply(phase, dbUrl, "none", "none") + GraphSubscriberHelper.apply(s2Config) + toKeyValues(iter.toSeq) } } + } + + def generateHFile(sc: SparkContext, + s2Config: Config, + kvs: RDD[KeyValue], + options: GraphFileOptions): Unit = { + val hbaseConfig = toHBaseConfig(options) + val startKeys = + if (options.incrementalLoad) { + // need hbase connection to existing table to figure out the ranges of regions. + getTableStartKeys(hbaseConfig, TableName.valueOf(options.tableName)) + } else { + // otherwise we do not need to initialize Connection to hbase cluster. + // only numRegions determine region's pre-split. + getStartKeys(numRegions = options.numRegions) + } + + val hbaseSc = new HBaseContext(sc, hbaseConfig) - val hbaseSc = new HBaseContext(sc, hbaseConf) def flatMap(kv: KeyValue): Iterator[(KeyFamilyQualifier, Array[Byte])] = { val k = new KeyFamilyQualifier(CellUtil.cloneRow(kv), CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)) val v = CellUtil.cloneValue(kv) Seq((k -> v)).toIterator } - val familyOptions = new FamilyHFileWriteOptions(Algorithm.LZ4.getName.toUpperCase, + + val compressionAlgorithmClass = Algorithm.valueOf(options.compressionAlgorithm).getName.toUpperCase + val familyOptions = new FamilyHFileWriteOptions(compressionAlgorithmClass, BloomType.ROW.name().toUpperCase, 32768, DataBlockEncoding.FAST_DIFF.name().toUpperCase) val familyOptionsMap = Map("e".getBytes("UTF-8") -> familyOptions, "v".getBytes("UTF-8") -> familyOptions) - hbaseSc.bulkLoad(merged, TableName.valueOf(tableName), flatMap, tmpPath, familyOptionsMap) + + hbaseSc.bulkLoad(kvs, TableName.valueOf(options.tableName), startKeys, flatMap, options.tmpPath, familyOptionsMap) + } + + def getTableStartKeys(hbaseConfig: Configuration, tableName: TableName): Array[Array[Byte]] = { + val conn = ConnectionFactory.createConnection(hbaseConfig) + val regionLocator = conn.getRegionLocator(tableName) + regionLocator.getStartKeys + } + + def toHBaseConfig(graphFileOptions: GraphFileOptions): Configuration = { + val hbaseConf = HBaseConfiguration.create() + + hbaseConf.set("hbase.zookeeper.quorum", graphFileOptions.zkQuorum) + hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, graphFileOptions.tableName) + hbaseConf.set("hadoop.tmp.dir", s"/tmp/${graphFileOptions.tableName}") + + hbaseConf + } + + def getStartKeys(numRegions: Int): Array[Array[Byte]] = { + val startKey = AsynchbaseStorageManagement.getStartKey(numRegions) + val endKey = AsynchbaseStorageManagement.getEndKey(numRegions) + if (numRegions < 3) { + throw new IllegalArgumentException("Must create at least three regions") + } else if (Bytes.compareTo(startKey, endKey) >= 0) { + throw new IllegalArgumentException("Start key must be smaller than end key") + } + val empty = new Array[Byte](0) + + if (numRegions == 3) { + Array(empty, startKey, endKey) + } else { + val splitKeys: Array[Array[Byte]] = Bytes.split(startKey, endKey, numRegions - 3) + if (splitKeys == null || splitKeys.length != numRegions - 1) { + throw new IllegalArgumentException("Unable to split key range into enough regions") + } + Array(empty) ++ splitKeys.toSeq + } + } + + override def run() = { + parser.parse(args, GraphFileOptions()) match { + case Some(o) => options = o + case None => + parser.showUsage() + throw new IllegalArgumentException("failed to parse options...") + } + + println(s">>> Options: ${options}") + val s2Config = Management.toConfig(options.toConfigParams) + + val conf = sparkConf(s"TransferToHFile") + + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.kryoserializer.buffer.mb", "24") + + val sc = new SparkContext(conf) + val rdd = sc.textFile(options.input) + + GraphSubscriberHelper.apply(s2Config) + + val merged = TransferToHFile.generateKeyValues(sc, s2Config, rdd, options) + generateHFile(sc, s2Config, merged, options) +// /* set up hbase init */ +// val hbaseSc = new HBaseContext(sc, toHBaseConfig(options)) +// +// def flatMap(kv: KeyValue): Iterator[(KeyFamilyQualifier, Array[Byte])] = { +// val k = new KeyFamilyQualifier(CellUtil.cloneRow(kv), CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)) +// val v = CellUtil.cloneValue(kv) +// Seq((k -> v)).toIterator +// } +// +// val familyOptions = new FamilyHFileWriteOptions(Algorithm.LZ4.getName.toUpperCase, +// BloomType.ROW.name().toUpperCase, 32768, DataBlockEncoding.FAST_DIFF.name().toUpperCase) +// val familyOptionsMap = Map("e".getBytes("UTF-8") -> familyOptions, "v".getBytes("UTF-8") -> familyOptions) +// +// val startKeys = getStartKeys(numRegions = options.numRegions) +// hbaseSc.bulkLoad(merged, TableName.valueOf(options.tableName), startKeys, flatMap, options.tmpPath, familyOptionsMap) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala ---------------------------------------------------------------------- diff --git a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala new file mode 100644 index 0000000..6918ce4 --- /dev/null +++ b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.s2graph.loader.subscriber + +import java.io.PrintWriter +import java.util + +import com.typesafe.config.ConfigFactory +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles +import org.apache.hadoop.util.ToolRunner +import org.apache.s2graph.core.{Management, PostProcess} +import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} +import org.apache.s2graph.core.mysqls.{Label, ServiceColumn} +import org.apache.s2graph.core.storage.CanSKeyValue +import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage +import org.apache.s2graph.core.types.HBaseType +import org.apache.s2graph.loader.subscriber.TransferToHFile.options +import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} +import play.api.libs.json.Json + +import scala.util.Try + +class TransferToHFileTest extends FunSuite with Matchers with BeforeAndAfterAll { + + import TransferToHFile._ + import scala.collection.JavaConverters._ + + private val master = "local[2]" + private val appName = "example-spark" + + private var sc: SparkContext = _ + + /* TransferHFile parameters */ + val options = GraphFileOptions( + input = "/tmp/test.txt", + tmpPath = "/tmp/s2graph", + zkQuorum = "localhost", + dbUrl = "jdbc:h2:file:./var/metastore;MODE=MYSQL", + dbUser = "sa", + dbPassword = "sa", + tableName = "s2graph", + maxHFilePerRegionServer = 1, + numRegions = 3, + compressionAlgorithm = "NONE", + buildDegree = false, + autoEdgeCreate = false) + + val s2Config = Management.toConfig(options.toConfigParams) + + val tableName = options.tableName + val schemaVersion = HBaseType.DEFAULT_VERSION + val compressionAlgorithm: String = options.compressionAlgorithm + + override def beforeAll(): Unit = { + // initialize spark context. + val conf = new SparkConf() + .setMaster(master) + .setAppName(appName) + + sc = new SparkContext(conf) + + GraphSubscriberHelper.apply(s2Config) + } + + override def afterAll(): Unit = { + GraphSubscriberHelper.g.shutdown() + if (sc != null) { + sc.stop() + } + } + + private def writeToFile(fileName: String)(lines: Seq[String]): Unit = { + val writer = new PrintWriter(fileName) + lines.foreach(line => writer.write(line + "\n")) + writer.close + } + + private def initTestEdgeSchema(): Label = { + import scala.collection.JavaConverters._ + /* initialize model for test */ + val management = GraphSubscriberHelper.management + + val service = management.createService(serviceName = "s2graph", cluster = "localhost", + hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, compressionAlgorithm = "gz") + + val serviceColumn = management.createServiceColumn(service.serviceName, "user", "string", Nil) + + Try { + management.createLabel("friends", serviceColumn, serviceColumn, isDirected = true, + serviceName = service.serviceName, indices = new java.util.ArrayList[Index], + props = Seq(Prop("since", "0", "long"), Prop("score", "0", "integer")).asJava, consistencyLevel = "strong", hTableName = tableName, + hTableTTL = -1, schemaVersion = schemaVersion, compressionAlgorithm = compressionAlgorithm, options = "") + } + + Label.findByName("friends").getOrElse(throw new IllegalArgumentException("friends label is not initialized.")) + } + + private def initTestVertexSchema(): ServiceColumn = { + import scala.collection.JavaConverters._ + /* initialize model for test */ + val management = GraphSubscriberHelper.management + + val service = management.createService(serviceName = "s2graph", cluster = "localhost", + hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, compressionAlgorithm = "gz") + + management.createServiceColumn(service.serviceName, "imei", "string", + Seq( + Prop(name = "first_time", defaultValue = "''", dataType = "string"), + Prop(name = "last_time", defaultValue = "''", dataType = "string"), + Prop(name = "total_active_days", defaultValue = "-1", dataType = "integer"), + Prop(name = "query_amount", defaultValue = "-1", dataType = "integer"), + Prop(name = "active_months", defaultValue = "-1", dataType = "integer"), + Prop(name = "fua", defaultValue = "''", dataType = "string"), + Prop(name = "location_often_province", defaultValue = "''", dataType = "string"), + Prop(name = "location_often_city", defaultValue = "''", dataType = "string"), + Prop(name = "location_often_days", defaultValue = "-1", dataType = "integer"), + Prop(name = "location_last_province", defaultValue = "''", dataType = "string"), + Prop(name = "location_last_city", defaultValue = "''", dataType = "string"), + Prop(name = "fimei_legality", defaultValue = "-1", dataType = "integer") + )) + } + + test("test generateKeyValues edge only.") { + import scala.collection.JavaConverters._ + import org.apache.s2graph.core.storage.CanSKeyValue._ + + val label = initTestEdgeSchema() + /* end of initialize model */ + + val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" + val input = sc.parallelize(Seq(bulkEdgeString)) + + val kvs = TransferToHFile.generateKeyValues(sc, s2Config, input, options) + + val ls = kvs.map(kv => CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)).collect().toList + + val serDe = GraphSubscriberHelper.g.defaultStorage.serDe + + // val snapshotEdgeOpt = serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(ls.head), None) + // val indexEdgeOpt = serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(ls.last), None) + + val bulkEdge = GraphSubscriberHelper.g.elementBuilder.toGraphElement(bulkEdgeString, options.labelMapping).get + + val indexEdges = ls.flatMap { kv => + serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), None) + } + + val indexEdge = indexEdges.head + + bulkEdge shouldBe(indexEdge) + } + + + test("test generateKeyValues vertex only.") { + val serviceColumn = initTestVertexSchema() + val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\ts2graph\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广ä¸ç\",\"location_often_city\":\"æ·±å³å¸\",\"location_often_days\":6,\"location_last_province\":\"广ä¸ç\",\"location_last_city\":\"æ·±å³å¸\",\"fimei_legality\":3}" + val bulkVertex = GraphSubscriberHelper.g.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get + + val input = sc.parallelize(Seq(bulkVertexString)) + + val kvs = TransferToHFile.generateKeyValues(sc, s2Config, input, options) + + val ls = kvs.map(kv => CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)).collect().toList + + val serDe = GraphSubscriberHelper.g.defaultStorage.serDe + + + + val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get + + PostProcess.s2VertexToJson(vertex).foreach { jsValue => + println(Json.prettyPrint(jsValue)) + } + + bulkVertex shouldBe(vertex) + } + + test("test generateHFile vertex only.") { + val serviceColumn = initTestVertexSchema() + + val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\ts2graph\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广ä¸ç\",\"location_often_city\":\"æ·±å³å¸\",\"location_often_days\":6,\"location_last_province\":\"广ä¸ç\",\"location_last_city\":\"æ·±å³å¸\",\"fimei_legality\":3}" + val input = sc.parallelize(Seq(bulkVertexString)) + + val kvs = TransferToHFile.generateKeyValues(sc, s2Config, input, options) + TransferToHFile.generateHFile(sc, s2Config, kvs, options) + } + + test("test loader script.") { + val serviceColumn = initTestVertexSchema() + + val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\ts2graph\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广ä¸ç\",\"location_often_city\":\"æ·±å³å¸\",\"location_often_days\":6,\"location_last_province\":\"广ä¸ç\",\"location_last_city\":\"æ·±å³å¸\",\"fimei_legality\":3}" + val bulkVertexLs = Seq(bulkVertexString) + writeToFile(options.input)(bulkVertexLs) + + val input = sc.parallelize(bulkVertexLs) + GraphSubscriberHelper.apply(s2Config) + val graph = GraphSubscriberHelper.g + val vertex = graph.elementBuilder.toVertex(bulkVertexString).get + + val kvs = TransferToHFile.generateKeyValues(sc, s2Config, input, options) + TransferToHFile.generateHFile(sc, s2Config, kvs, options) + + val hfileArgs = Array(options.tmpPath, options.tableName) + val hbaseConfig = HBaseConfiguration.create() + + val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) + + val vertexId = graph.elementBuilder.newVertexId("s2graph")("imei")("800188448586078") + val vertexOpt = graph.getVertex(vertexId) + + vertexOpt.isDefined shouldBe(true) + vertexOpt.get shouldBe (vertex) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/project/Common.scala ---------------------------------------------------------------------- diff --git a/project/Common.scala b/project/Common.scala index d3b8d93..e2323a2 100644 --- a/project/Common.scala +++ b/project/Common.scala @@ -21,6 +21,7 @@ import sbt._ object Common { val sparkVersion = "1.4.1" + val spark2Version = "2.3.0" val playVersion = "2.5.9" val specs2Version = "3.8.5" http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala index c8c25b3..0478413 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala @@ -52,7 +52,7 @@ class GraphElementBuilder(graph: S2GraphLike) { element } recover { case e: Exception => - logger.error(s"[toElement]: $e", e) + logger.error(s"[toElement]: $s", e) None } get http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala index 9de3d9d..b92e47b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala @@ -344,8 +344,9 @@ object JSONParser { def fromJsonToProperties(jsObject: JsObject): Map[String, Any] = { val kvs = for { (k, v) <- jsObject.fieldSet + anyVal <- jsValueToAny(v) } yield { - k -> jsValueToString(v) + k -> anyVal } kvs.toMap } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala index 819c378..95c3c7b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala @@ -121,11 +121,13 @@ case class ServiceColumn(id: Option[Int], lazy val toJson = Json.obj("serviceName" -> service.serviceName, "columnName" -> columnName, "columnType" -> columnType) def propsToInnerVals(props: Map[String, Any]): Map[ColumnMeta, InnerValLike] = { - for { + val ret = for { (k, v) <- props labelMeta <- metasInvMap.get(k) innerVal = toInnerVal(v, labelMeta.dataType, schemaVersion) } yield labelMeta -> innerVal + + ret } def innerValsToProps(props: Map[Int, InnerValLike]): Map[String, Any] = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala index 775afda..57adc8a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala @@ -65,6 +65,10 @@ object CanSKeyValue { SKeyValue(Array.empty[Byte], kv.key(), kv.family(), kv.qualifier(), kv.value(), kv.timestamp()) } + implicit val hbaseKeyValue = instance[org.apache.hadoop.hbase.KeyValue] { kv => + SKeyValue(Array.empty[Byte], kv.getRow, kv.getFamily, kv.getQualifier, kv.getValue, kv.getTimestamp) + } + // For asyncbase KeyValues implicit val sKeyValue = instance[SKeyValue](identity) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala index 32d640c..78da629 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala @@ -70,7 +70,7 @@ trait StorageSerDe { **/ def snapshotEdgeDeserializer(schemaVer: String): Deserializable[SnapshotEdge] - def indexEdgeDeserializer(schemaVer: String): IndexEdgeDeserializable + def indexEdgeDeserializer(schemaVer: String): Deserializable[S2EdgeLike] def vertexDeserializer(schemaVer: String): Deserializable[S2VertexLike] http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala index 0fb3173..8475ba6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala @@ -56,6 +56,14 @@ object AsynchbaseStorageManagement { val DefaultCreateTableOptions = Map( "hbase.zookeeper.quorum" -> "localhost" ) + + def getStartKey(regionCount: Int): Array[Byte] = { + Bytes.toBytes((Int.MaxValue / regionCount)) + } + + def getEndKey(regionCount: Int): Array[Byte] = { + Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1))) + } } class AsynchbaseStorageManagement(val config: Config, val clients: Seq[HBaseClient]) extends StorageManagement { @@ -271,12 +279,4 @@ class AsynchbaseStorageManagement(val config: Config, val clients: Seq[HBaseClie conn.getAdmin } } - - private def getStartKey(regionCount: Int): Array[Byte] = { - Bytes.toBytes((Int.MaxValue / regionCount)) - } - - private def getEndKey(regionCount: Int): Array[Byte] = { - Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1))) - } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/build.sbt ---------------------------------------------------------------------- diff --git a/s2jobs/build.sbt b/s2jobs/build.sbt new file mode 100644 index 0000000..b915238 --- /dev/null +++ b/s2jobs/build.sbt @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import Common._ + +name := "s2jobs" + +scalacOptions ++= Seq("-deprecation") + +projectDependencies := Seq( + (projectID in "s2core").value exclude("org.mortbay.jetty", "j*") exclude("javax.xml.stream", "s*") exclude("javax.servlet", "s*") exclude("javax.servlet", "j*") +) + +libraryDependencies ++= Seq( + "com.google.guava" % "guava" % "12.0.1" force(), // use this old version of guava to avoid incompatibility + "org.apache.spark" %% "spark-core" % spark2Version, + "org.apache.spark" %% "spark-streaming" % spark2Version % "provided", + "org.apache.spark" %% "spark-hive" % spark2Version % "provided", + "org.specs2" %% "specs2-core" % specs2Version % "test", + "org.scalatest" %% "scalatest" % "2.2.1" % "test", + "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion, + "com.github.scopt" %% "scopt" % "3.7.0" +) + +crossScalaVersions := Seq("2.10.6") + +mergeStrategy in assembly := { + case PathList("META-INF", ps @ _*) => MergeStrategy.discard + case _ => MergeStrategy.first +} + +excludedJars in assembly := { + val cp = (fullClasspath in assembly).value + cp filter {_.data.getName == "guava-16.0.1.jar"} +} + +test in assembly := {} + +parallelExecution in Test := false + +mainClass in (Compile) := None http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/loader.py ---------------------------------------------------------------------- diff --git a/s2jobs/loader.py b/s2jobs/loader.py new file mode 100644 index 0000000..d3a9e67 --- /dev/null +++ b/s2jobs/loader.py @@ -0,0 +1,153 @@ +#!/usr/bin/python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os, sys +#, urllib2, urllib + +def cleanup(args): + cmd = "hadoop fs -rm -r /tmp/%s" % args["htable_name"] + print(cmd) + ret = os.system(cmd) + print(cmd, "return", ret) + return ret + +def hfile(args): + print(args) + cmd = """HADOOP_CONF_DIR=%s spark-submit --class "org.apache.s2graph.s2jobs.loader.GraphFileGenerator" \ +--name "GraphFileGenerator@shon" \ +--conf "spark.task.maxFailures=20" \ +--conf "spark.executor.extraClassPath=%s" \ +--conf "spark.driver.extraClassPath=%s" \ +--jars %s \ +--master local[2] \ +--num-executors %s \ +--driver-memory 1g \ +--executor-memory 2g \ +--executor-cores 1 \ +%s \ +--input %s \ +--tempDir %s \ +--output /tmp/%s \ +--zkQuorum %s \ +--table %s \ +--dbUrl '%s' \ +--dbUser %s \ +--dbPassword %s \ +--dbDriver %s \ +--maxHFilePerRegionServer %s \ +--labelMapping %s \ +--autoEdgeCreate %s""" % (args["HADOOP_CONF_DIR"], + MYSQL_JAR, + MYSQL_JAR, + MYSQL_JAR, + args["num_executors"], + JAR, + args["input"], + args["tempDir"], + args["htable_name"], + args["hbase_zk"], + args["htable_name"], + args["db_url"], + args["db_user"], + args["db_password"], + args["db_driver"], + args["max_file_per_region"], + args["label_mapping"], + args["auto_create_edge"]) + print(cmd) + ret = os.system(cmd) + print(cmd, "return", ret) + return ret + +def distcp(args): + cmd = "hadoop distcp -overwrite -m %s -bandwidth %s /tmp/%s %s/tmp/%s" % (args["-m"], args["-bandwidth"], args["htable_name"], args["hbase_namenode"], args["htable_name"]) + print(cmd) + ret = os.system(cmd) + print(cmd, "return", ret) + return ret + +def chmod(args): + cmd = "export HADOOP_CONF_DIR=%s; export HADOOP_USER_NAME=hdfs; hadoop fs -chmod -R 777 /tmp/%s" % (args["HADOOP_CONF_DIR"], args["htable_name"]) + print(cmd) + ret = os.system(cmd) + print(cmd, "return", ret) + return ret + +def load(args): + cmd = "export HADOOP_CONF_DIR=%s; export HBASE_CONF_DIR=%s; hbase %s /tmp/%s %s" % \ + (args["HADOOP_CONF_DIR"], args["HBASE_CONF_DIR"], LOADER_CLASS, args["htable_name"], args["htable_name"]) + print(cmd) + ret = os.system(cmd) + print(cmd, "return", ret) + return ret + +def send(msg): + print(msg) + +def run(args): + cleanup(args) + send("[Start]: bulk loader") + ret = hfile(args) + + if ret != 0: return send("[Failed]: loader build hfile failed %s" % ret) + else: send("[Success]: loader build hfile") + + # ret = distcp(args) + # + # if ret != 0: return send("[Failed]: loader distcp failed %s" % ret) + # else: send("[Success]: loader distcp") + # + # ret = chmod(args) + # + # if ret != 0: return send("[Failed]: loader chmod failed %s" % ret) + # else: send("[Success]: loader chmod") + + ret = load(args) + + if ret != 0: return send("[Failed]: loader complete bulkload failed %s" % ret) + else: send("[Success]: loader complete bulkload") + + +LOADER_CLASS = "org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles" +JAR="s2jobs/target/scala-2.11/s2jobs-assembly-0.2.1-SNAPSHOT.jar" +MYSQL_JAR="/Users/shon/Downloads/mysql-connector-java-5.1.28.jar" +MYSQL_CLASSPATH="/Users/shon/Downloads/mysql-connector-java-5.1.28.jar" +DB_DRIVER="com.mysql.jdbc.Driver" +DB_URL="jdbc:mysql://localhost:3306/graph_dev" +# DB_URL="jdbc:h2:file:./var/metastore;MODE=MYSQL" +args = { + "HADOOP_CONF_DIR": "/usr/local/Cellar/hadoop/2.7.3/libexec/etc/hadoop", + "HBASE_CONF_DIR": "/usr/local/opt/hbase/libexec/conf", + "htable_name": "test", + "hbase_namenode": "hdfs://localhost:8020", + "hbase_zk": "localhost", + "db_driver": DB_DRIVER, + "db_url": DB_URL, + "db_user": "graph", + "db_password": "graph", + "max_file_per_region": 1, + "label_mapping": "none", + "auto_create_edge": "false", + "-m": 1, + "-bandwidth": 10, + "num_executors": 2, + "input": "/tmp/imei-20.txt", + "tempDir": "/tmp/bulkload_tmp" +} + +run(args) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/java/org/apache/hadoop/hbase/mapreduce/GraphHFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/java/org/apache/hadoop/hbase/mapreduce/GraphHFileOutputFormat.java b/s2jobs/src/main/java/org/apache/hadoop/hbase/mapreduce/GraphHFileOutputFormat.java new file mode 100644 index 0000000..4d6599f --- /dev/null +++ b/s2jobs/src/main/java/org/apache/hadoop/hbase/mapreduce/GraphHFileOutputFormat.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.net.URLEncoder; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; + +public class GraphHFileOutputFormat extends HFileOutputFormat2 { + private static final String COMPRESSION_FAMILIES_CONF_KEY = + "hbase.hfileoutputformat.families.compression"; + private static final String BLOOM_TYPE_FAMILIES_CONF_KEY = + "hbase.hfileoutputformat.families.bloomtype"; + private static final String BLOCK_SIZE_FAMILIES_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.blocksize"; + private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; + + // This constant is public since the client can modify this when setting + // up their conf object and thus refer to this symbol. + // It is present for backwards compatibility reasons. Use it only to + // override the auto-detection of datablock encoding. + public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.datablock.encoding"; + + static Log LOG = LogFactory.getLog(GraphHFileOutputFormat.class); + + public static void configureIncrementalLoad(Job job, List<ImmutableBytesWritable> startKeys, + List<String> familyNames, Compression.Algorithm compression, BloomType bloomType, + int blockSize, DataBlockEncoding dataBlockEncoding) throws IOException { + + Configuration conf = job.getConfiguration(); + + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(KeyValue.class); + job.setOutputFormatClass(HFileOutputFormat2.class); + + // Based on the configured map output class, set the correct reducer to properly + // sort the incoming values. + // TODO it would be nice to pick one or the other of these formats. + if (KeyValue.class.equals(job.getMapOutputValueClass())) { + job.setReducerClass(KeyValueSortReducer.class); + } else if (Put.class.equals(job.getMapOutputValueClass())) { + job.setReducerClass(PutSortReducer.class); + } else if (Text.class.equals(job.getMapOutputValueClass())) { + job.setReducerClass(TextSortReducer.class); + } else { + LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); + } + + conf.setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName(), + KeyValueSerialization.class.getName()); + + job.setNumReduceTasks(startKeys.size()); + + configurePartitioner(job, startKeys); + // Set compression algorithms based on column families + configureCompression(familyNames, compression, conf); + configureBloomType(familyNames, bloomType, conf); + configureBlockSize(familyNames, blockSize, conf); + configureDataBlockEncoding(familyNames, dataBlockEncoding, conf); + + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.initCredentials(job); + LOG.info("Incremental table output configured."); + } + + static void configureCompression(List<String> familyNames, Compression.Algorithm compression, + Configuration conf) throws IOException { + StringBuilder compressionConfigValue = new StringBuilder(); + int i = 0; + for (String familyName : familyNames) { + if (i++ > 0) { + compressionConfigValue.append('&'); + } + compressionConfigValue.append(URLEncoder.encode(familyName, "UTF-8")); + compressionConfigValue.append('='); + compressionConfigValue.append(URLEncoder.encode(compression.getName(), "UTF-8")); + } + // Get rid of the last ampersand + conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString()); + } + + static void configureBloomType(List<String> familyNames, BloomType bloomType, Configuration conf) + throws IOException { + StringBuilder bloomTypeConfigValue = new StringBuilder(); + int i = 0; + for (String familyName : familyNames) { + if (i++ > 0) { + bloomTypeConfigValue.append('&'); + } + bloomTypeConfigValue.append(URLEncoder.encode(familyName, "UTF-8")); + bloomTypeConfigValue.append('='); + String bloomTypeStr = bloomType.toString(); + if (bloomTypeStr == null) { + bloomTypeStr = HColumnDescriptor.DEFAULT_BLOOMFILTER; + } + bloomTypeConfigValue.append(URLEncoder.encode(bloomTypeStr, "UTF-8")); + } + conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString()); + } + + static void configureBlockSize(List<String> familyNames, int blockSize, Configuration conf) + throws IOException { + StringBuilder blockSizeConfigValue = new StringBuilder(); + int i = 0; + for (String familyName : familyNames) { + if (i++ > 0) { + blockSizeConfigValue.append('&'); + } + blockSizeConfigValue.append(URLEncoder.encode(familyName, "UTF-8")); + blockSizeConfigValue.append('='); + blockSizeConfigValue.append(URLEncoder.encode(String.valueOf(blockSize), "UTF-8")); + } + // Get rid of the last ampersand + conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString()); + } + + static void configureDataBlockEncoding(List<String> familyNames, DataBlockEncoding encoding, + Configuration conf) throws IOException { + StringBuilder dataBlockEncodingConfigValue = new StringBuilder(); + int i = 0; + for (String familyName : familyNames) { + if (i++ > 0) { + dataBlockEncodingConfigValue.append('&'); + } + dataBlockEncodingConfigValue.append(URLEncoder.encode(familyName, "UTF-8")); + dataBlockEncodingConfigValue.append('='); + if (encoding == null) { + encoding = DataBlockEncoding.NONE; + } + dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(), "UTF-8")); + } + conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, dataBlockEncodingConfigValue.toString()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala new file mode 100644 index 0000000..ef76608 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs + +import com.typesafe.config.Config +import org.apache.s2graph.core._ + +import scala.concurrent.ExecutionContext + +object S2GraphHelper { + def initS2Graph(config: Config)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): S2Graph = { + new S2Graph(config) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileGenerator.scala new file mode 100644 index 0000000..51476c1 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileGenerator.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.loader + +import org.apache.s2graph.core._ +import org.apache.spark.{SparkConf, SparkContext} + +object GraphFileGenerator { + def main(args: Array[String]): Unit = { + val options = GraphFileOptions.toOption(args) + + val s2Config = Management.toConfig(options.toConfigParams) + + val conf = new SparkConf() + conf.setAppName(this.getClass.getSimpleName) + val sc = new SparkContext(conf) + + val input = sc.textFile(options.input) + options.method match { + case "MR" => HFileMRGenerator.generate(sc, s2Config, input, options) + case "SPARK" => HFileGenerator.generate(sc, s2Config, input, options) + case _ => throw new IllegalArgumentException("only supported type is MR/SPARK.") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala new file mode 100644 index 0000000..3e3ffb9 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.loader + +object GraphFileOptions { + val parser = new scopt.OptionParser[GraphFileOptions]("run") { + + opt[String]('i', "input").required().action( (x, c) => + c.copy(input = x) ).text("hdfs path for tsv file(bulk format)") + + opt[String]('t', "tempDir").required().action( (x, c) => + c.copy(tempDir = x) ).text("temp hdfs path for staging HFiles") + + opt[String]('o', "output").required().action( (x, c) => + c.copy(output = x) ).text("output hdfs path for storing HFiles") + + opt[String]('z', "zkQuorum").required().action( (x, c) => + c.copy(zkQuorum = x) ).text("zookeeper config for hbase") + + opt[String]('t', "table").required().action( (x, c) => + c.copy(tableName = x) ).text("table name for this bulk upload.") + + opt[String]('c', "dbUrl").required().action( (x, c) => + c.copy(dbUrl = x)).text("jdbc connection url.") + + opt[String]('u', "dbUser").required().action( (x, c) => + c.copy(dbUser = x)).text("database user name.") + + opt[String]('p', "dbPassword").required().action( (x, c) => + c.copy(dbPassword = x)).text("database password.") + + opt[String]('r', "dbDriver").action( (x, c) => + c.copy(dbDriver = x)).text("jdbc driver class.") + + opt[Int]('h', "maxHFilePerRegionServer").action ( (x, c) => + c.copy(maxHFilePerRegionServer = x)).text("maximum number of HFile per RegionServer." + ) + + opt[Int]('n', "numRegions").action ( (x, c) => + c.copy(numRegions = x)).text("total numRegions(pre-split size) on table." + ) + + opt[String]('l', "labelMapping").action( (x, c) => + c.copy(labelMapping = toLabelMapping(x)) ).text("mapping info to change the label from source (originalLabel:newLabel)") + + opt[Boolean]('d', "buildDegree").action( (x, c) => + c.copy(buildDegree = x)).text("generate degree values") + + opt[Boolean]('a', "autoEdgeCreate").action( (x, c) => + c.copy(autoEdgeCreate = x)).text("generate reverse edge automatically") + + opt[Boolean]('c', "incrementalLoad").action( (x, c) => + c.copy(incrementalLoad = x)).text("whether incremental bulkload which append data on existing table or not." + ) + opt[String]('m', "method").action( (x, c) => + c.copy(method = x)).text("run method. currently MR(default)/SPARK supported." + ) + } + + def toOption(args: Array[String]): GraphFileOptions = { + parser.parse(args, GraphFileOptions()) match { + case Some(o) => o + case None => + parser.showUsage() + throw new IllegalArgumentException("failed to parse options...") + } + } + + private def toLabelMapping(lableMapping: String): Map[String, String] = { + (for { + token <- lableMapping.split(",") + inner = token.split(":") if inner.length == 2 + } yield { + (inner.head, inner.last) + }).toMap + } +} +/** + * Option case class for TransferToHFile. + * @param input + * @param output + * @param zkQuorum + * @param tableName + * @param dbUrl + * @param dbUser + * @param dbPassword + * @param maxHFilePerRegionServer + * @param numRegions + * @param labelMapping + * @param autoEdgeCreate + * @param buildDegree + * @param incrementalLoad + * @param compressionAlgorithm + */ +case class GraphFileOptions(input: String = "", + tempDir: String = "", + output: String = s"/tmp/bulkload", + zkQuorum: String = "", + tableName: String = "", + dbUrl: String = "", + dbUser: String = "", + dbPassword: String = "", + dbDriver: String = "org.h2.Driver", + maxHFilePerRegionServer: Int = 1, + numRegions: Int = 3, + labelMapping: Map[String, String] = Map.empty[String, String], + autoEdgeCreate: Boolean = false, + buildDegree: Boolean = false, + incrementalLoad: Boolean = false, + compressionAlgorithm: String = "NONE", + method: String = "SPARK") { + def toConfigParams = { + Map( + "hbase.zookeeper.quorum" -> zkQuorum, + "db.default.url" -> dbUrl, + "db.default.user" -> dbUser, + "db.default.password" -> dbPassword, + "db.default.driver" -> dbDriver + ) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala new file mode 100644 index 0000000..9c3de2a --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.loader + +import com.typesafe.config.Config +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.client.ConnectionFactory +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat +import org.apache.hadoop.hbase.regionserver.BloomType +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, TableName} +import org.apache.s2graph.core._ +import org.apache.s2graph.core.mysqls.{Label, LabelMeta} +import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement +import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId} +import org.apache.s2graph.s2jobs.S2GraphHelper +import org.apache.s2graph.s2jobs.spark._ +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.hbase.async.PutRequest +import play.api.libs.json.Json + +object HFileGenerator extends RawFileGenerator { + + import scala.collection.JavaConverters._ + + private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, createRelEdges: Boolean = true): List[PutRequest] = { + val relEdges = if (createRelEdges) edge.relatedEdges else List(edge) + + buildPutRequests(s2, edge.toSnapshotEdge) ++ relEdges.toList.flatMap { e => + e.edgesWithIndex.flatMap { indexEdge => buildPutRequests(s2, indexEdge) } + } + } + + def buildPutRequests(s2: S2Graph, snapshotEdge: SnapshotEdge): List[PutRequest] = { + val kvs = s2.getStorage(snapshotEdge.label).serDe.snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList + kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) } + } + + def buildPutRequests(s2: S2Graph, indexEdge: IndexEdge): List[PutRequest] = { + val kvs = s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.toList + kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) } + } + + def buildDegreePutRequests(s2: S2Graph, vertexId: String, labelName: String, direction: String, degreeVal: Long): List[PutRequest] = { + val label = Label.findByName(labelName).getOrElse(throw new RuntimeException(s"$labelName is not found in DB.")) + val dir = GraphUtil.directions(direction) + val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse { + throw new RuntimeException(s"$vertexId can not be converted into innerval") + } + val vertex = s2.elementBuilder.newVertex(SourceVertexId(label.srcColumn, innerVal)) + + val ts = System.currentTimeMillis() + val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion)) + val edge = s2.elementBuilder.newEdge(vertex, vertex, label, dir, propsWithTs = propsWithTs) + + edge.edgesWithIndex.flatMap { indexEdge => + s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.map { kv => + new PutRequest(kv.table, kv.row, kv.cf, Array.empty[Byte], Bytes.toBytes(degreeVal), kv.timestamp) + } + } + } + + def toKeyValues(s2: S2Graph, degreeKeyVals: Seq[(DegreeKey, Long)]): Iterator[KeyValue] = { + val kvs = for { + (key, value) <- degreeKeyVals + putRequest <- buildDegreePutRequests(s2, key.vertexIdStr, key.labelName, key.direction, value) + } yield { + val p = putRequest + val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) + kv + } + kvs.toIterator + } + + def toKeyValues(s2: S2Graph, strs: Seq[String], labelMapping: Map[String, String], autoEdgeCreate: Boolean): Iterator[KeyValue] = { + val kvList = new java.util.ArrayList[KeyValue] + for (s <- strs) { + val elementList = s2.elementBuilder.toGraphElement(s, labelMapping).toSeq + for (element <- elementList) { + if (element.isInstanceOf[S2Edge]) { + val edge = element.asInstanceOf[S2Edge] + val putRequestList = insertBulkForLoaderAsync(s2, edge, autoEdgeCreate) + for (p <- putRequestList) { + val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) + kvList.add(kv) + } + } else if (element.isInstanceOf[S2Vertex]) { + val vertex = element.asInstanceOf[S2Vertex] + val putRequestList = s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues.map { kv => + new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) + } + for (p <- putRequestList) { + val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) + kvList.add(kv) + } + } + } + } + kvList.iterator().asScala + } + + + def getTableStartKeys(hbaseConfig: Configuration, tableName: TableName): Array[Array[Byte]] = { + val conn = ConnectionFactory.createConnection(hbaseConfig) + val regionLocator = conn.getRegionLocator(tableName) + regionLocator.getStartKeys + } + + def toHBaseConfig(graphFileOptions: GraphFileOptions): Configuration = { + val hbaseConf = HBaseConfiguration.create() + + hbaseConf.set("hbase.zookeeper.quorum", graphFileOptions.zkQuorum) + hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, graphFileOptions.tableName) + hbaseConf.set("hadoop.tmp.dir", s"/tmp/${graphFileOptions.tableName}") + + hbaseConf + } + + def getStartKeys(numRegions: Int): Array[Array[Byte]] = { + val startKey = AsynchbaseStorageManagement.getStartKey(numRegions) + val endKey = AsynchbaseStorageManagement.getEndKey(numRegions) + if (numRegions < 3) { + throw new IllegalArgumentException("Must create at least three regions") + } else if (Bytes.compareTo(startKey, endKey) >= 0) { + throw new IllegalArgumentException("Start key must be smaller than end key") + } + val empty = new Array[Byte](0) + + if (numRegions == 3) { + Array(empty, startKey, endKey) + } else { + val splitKeys: Array[Array[Byte]] = Bytes.split(startKey, endKey, numRegions - 3) + if (splitKeys == null || splitKeys.length != numRegions - 1) { + throw new IllegalArgumentException("Unable to split key range into enough regions") + } + Array(empty) ++ splitKeys.toSeq + } + } + + def transfer(sc: SparkContext, + s2Config: Config, + input: RDD[String], + graphFileOptions: GraphFileOptions): RDD[KeyValue] = { + val kvs = input.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(s2Config) + + val s = toKeyValues(s2, iter.toSeq, graphFileOptions.labelMapping, graphFileOptions.autoEdgeCreate) + s + } + + if (!graphFileOptions.buildDegree) kvs + else { + kvs ++ buildDegrees(input, graphFileOptions.labelMapping, graphFileOptions.autoEdgeCreate).reduceByKey { (agg, current) => + agg + current + }.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(s2Config) + + toKeyValues(s2, iter.toSeq) + } + } + } + + def generateHFile(sc: SparkContext, + s2Config: Config, + kvs: RDD[KeyValue], + options: GraphFileOptions): Unit = { + val hbaseConfig = toHBaseConfig(options) + val startKeys = + if (options.incrementalLoad) { + // need hbase connection to existing table to figure out the ranges of regions. + getTableStartKeys(hbaseConfig, TableName.valueOf(options.tableName)) + } else { + // otherwise we do not need to initialize Connection to hbase cluster. + // only numRegions determine region's pre-split. + getStartKeys(numRegions = options.numRegions) + } + + val hbaseSc = new HBaseContext(sc, hbaseConfig) + + def flatMap(kv: KeyValue): Iterator[(KeyFamilyQualifier, Array[Byte])] = { + val k = new KeyFamilyQualifier(CellUtil.cloneRow(kv), CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)) + val v = CellUtil.cloneValue(kv) + Seq((k -> v)).toIterator + } + + val compressionAlgorithmClass = Algorithm.valueOf(options.compressionAlgorithm).getName.toUpperCase + val familyOptions = new FamilyHFileWriteOptions(compressionAlgorithmClass, + BloomType.ROW.name().toUpperCase, 32768, DataBlockEncoding.FAST_DIFF.name().toUpperCase) + val familyOptionsMap = Map("e".getBytes("UTF-8") -> familyOptions, "v".getBytes("UTF-8") -> familyOptions) + + + hbaseSc.bulkLoad(kvs, TableName.valueOf(options.tableName), startKeys, flatMap, options.output, familyOptionsMap.asJava) + } + + override def generate(sc: SparkContext, + config: Config, + rdd: RDD[String], + _options: GraphFileOptions): Unit = { + val kvs = transfer(sc, config, rdd, _options) + generateHFile(sc, config, kvs, _options) + } +}
