Repository: incubator-s2graph Updated Branches: refs/heads/master 558264490 -> c30531bcd
initial commit. Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e42f7b22 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e42f7b22 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e42f7b22 Branch: refs/heads/master Commit: e42f7b22f26c7d725f9a33ff6badd7942768e478 Parents: 32eb344 Author: DO YUNG YOON <[email protected]> Authored: Thu Jun 14 19:20:51 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Thu Jun 14 19:20:51 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/core/GraphUtil.scala | 9 ++ .../scala/org/apache/s2graph/core/S2Graph.scala | 65 ++++++------ .../apache/s2graph/core/S2GraphConfigs.scala | 103 +++++++++++++++++++ .../s2graph/s2jobs/loader/HFileGenerator.scala | 45 +++++--- .../s2jobs/loader/HFileMRGenerator.scala | 2 +- .../loader/LocalBulkLoaderTransformer.scala | 7 +- .../loader/SparkBulkLoaderTransformer.scala | 4 +- .../org/apache/s2graph/s2jobs/task/Sink.scala | 56 +++++++--- .../spark/sql/streaming/S2SinkConfigs.scala | 25 ++++- .../apache/s2graph/s2jobs/BaseSparkTest.scala | 8 +- .../s2jobs/loader/GraphFileGeneratorTest.scala | 6 +- 11 files changed, 259 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala index e08bb4e..7f74032 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala @@ -171,4 +171,13 @@ object GraphUtil { def stringToOption(s: String): Option[String] = { Option(s).filter(_.trim.nonEmpty) } + + def toLabelMapping(lableMapping: String): Map[String, String] = { + (for { + token <- lableMapping.split(",") + inner = token.split(":") if inner.length == 2 + } yield { + (inner.head, inner.last) + }).toMap + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index d41bc24..dabbf80 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -51,42 +51,39 @@ object S2Graph { val DefaultScore = 1.0 val FetchAllLimit = 10000000 val DefaultFetchLimit = 1000 + import S2GraphConfigs._ private val DefaultConfigs: Map[String, AnyRef] = Map( - "hbase.zookeeper.quorum" -> "localhost", - "hbase.table.name" -> "s2graph", - "hbase.table.compression.algorithm" -> "gz", - "phase" -> "dev", - "db.default.driver" -> "org.h2.Driver", - "db.default.url" -> "jdbc:h2:file:./var/metastore;MODE=MYSQL", - "db.default.password" -> "graph", - "db.default.user" -> "graph", - "cache.max.size" -> java.lang.Integer.valueOf(0), - "cache.ttl.seconds" -> java.lang.Integer.valueOf(-1), - "resource.cache.max.size" -> java.lang.Integer.valueOf(1000), - "resource.cache.ttl.seconds" -> java.lang.Integer.valueOf(-1), - "hbase.client.retries.number" -> java.lang.Integer.valueOf(20), - "hbase.rpcs.buffered_flush_interval" -> java.lang.Short.valueOf(100.toShort), - "hbase.rpc.timeout" -> java.lang.Integer.valueOf(600000), - "max.retry.number" -> java.lang.Integer.valueOf(100), - "lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10), - "max.back.off" -> java.lang.Integer.valueOf(100), - "back.off.timeout" -> java.lang.Integer.valueOf(1000), - "hbase.fail.prob" -> java.lang.Double.valueOf(-0.1), - "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000), - "delete.all.fetch.count" -> java.lang.Integer.valueOf(200), - "future.cache.max.size" -> java.lang.Integer.valueOf(100000), - "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000), - "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000), - "future.cache.metric.interval" -> java.lang.Integer.valueOf(60000), - "query.future.cache.max.size" -> java.lang.Integer.valueOf(1000), - "query.future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000), - "query.future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000), - "query.future.cache.metric.interval" -> java.lang.Integer.valueOf(60000), - "s2graph.storage.backend" -> "hbase", - "query.hardlimit" -> java.lang.Integer.valueOf(100000), - "hbase.zookeeper.znode.parent" -> "/hbase", - "query.log.sample.rate" -> Double.box(0.05) + S2GRAPH_STORE_BACKEND -> DEFAULT_S2GRAPH_STORE_BACKEND, + PHASE -> DEFAULT_PHASE, + HBaseConfigs.HBASE_ZOOKEEPER_QUORUM -> HBaseConfigs.DEFAULT_HBASE_ZOOKEEPER_QUORUM, + HBaseConfigs.HBASE_ZOOKEEPER_ZNODE_PARENT -> HBaseConfigs.DEFAULT_HBASE_ZOOKEEPER_ZNODE_PARENT, + HBaseConfigs.HBASE_TABLE_NAME -> HBaseConfigs.DEFAULT_HBASE_TABLE_NAME, + HBaseConfigs.HBASE_TABLE_COMPRESSION_ALGORITHM -> HBaseConfigs.DEFAULT_HBASE_TABLE_COMPRESSION_ALGORITHM, + HBaseConfigs.HBASE_CLIENT_RETRIES_NUMBER -> HBaseConfigs.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER, + HBaseConfigs.HBASE_RPCS_BUFFERED_FLUSH_INTERVAL -> HBaseConfigs.DEFAULT_HBASE_RPCS_BUFFERED_FLUSH_INTERVAL, + HBaseConfigs.HBASE_RPC_TIMEOUT -> HBaseConfigs.DEFAULT_HBASE_RPC_TIMEOUT, + DBConfigs.DB_DEFAULT_DRIVER -> DBConfigs.DEFAULT_DB_DEFAULT_DRIVER, + DBConfigs.DB_DEFAULT_URL -> DBConfigs.DEFAULT_DB_DEFAULT_URL, + DBConfigs.DB_DEFAULT_PASSWORD -> DBConfigs.DEFAULT_DB_DEFAULT_PASSWORD, + DBConfigs.DB_DEFAULT_USER -> DBConfigs.DEFAULT_DB_DEFAULT_USER, + CacheConfigs.CACHE_MAX_SIZE -> CacheConfigs.DEFAULT_CACHE_MAX_SIZE, + CacheConfigs.CACHE_TTL_SECONDS -> CacheConfigs.DEFAULT_CACHE_TTL_SECONDS, + ResourceCacheConfigs.RESOURCE_CACHE_MAX_SIZE -> ResourceCacheConfigs.DEFAULT_RESOURCE_CACHE_MAX_SIZE, + ResourceCacheConfigs.RESOURCE_CACHE_TTL_SECONDS -> ResourceCacheConfigs.DEFAULT_RESOURCE_CACHE_TTL_SECONDS, + MutatorConfigs.MAX_RETRY_NUMBER -> MutatorConfigs.DEFAULT_MAX_RETRY_NUMBER, + MutatorConfigs.LOCK_EXPIRE_TIME -> MutatorConfigs.DEFAULT_LOCK_EXPIRE_TIME, + MutatorConfigs.MAX_BACK_OFF -> MutatorConfigs.DEFAULT_MAX_BACK_OFF, + MutatorConfigs.BACK_OFF_TIMEOUT -> MutatorConfigs.DEFAULT_BACK_OFF_TIMEOUT, + MutatorConfigs.HBASE_FAIL_PROB -> MutatorConfigs.DEFAULT_HBASE_FAIL_PROB, + MutatorConfigs.DELETE_ALL_FETCH_SIZE -> MutatorConfigs.DEFAULT_DELETE_ALL_FETCH_SIZE, + MutatorConfigs.DELETE_ALL_FETCH_COUNT -> MutatorConfigs.DEFAULT_DELETE_ALL_FETCH_COUNT, + FutureCacheConfigs.FUTURE_CACHE_MAX_SIZE -> FutureCacheConfigs.DEFAULT_FUTURE_CACHE_MAX_SIZE, + FutureCacheConfigs.FUTURE_CACHE_EXPIRE_AFTER_WRITE -> FutureCacheConfigs.DEFAULT_FUTURE_CACHE_EXPIRE_AFTER_WRITE, + FutureCacheConfigs.FUTURE_CACHE_EXPIRE_AFTER_ACCESS -> FutureCacheConfigs.DEFAULT_FUTURE_CACHE_EXPIRE_AFTER_ACCESS, + FutureCacheConfigs.FUTURE_CACHE_METRIC_INTERVAL -> FutureCacheConfigs.DEFAULT_FUTURE_CACHE_METRIC_INTERVAL, + QueryConfigs.QUERY_HARDLIMIT -> QueryConfigs.DEFAULT_QUERY_HARDLIMIT, + LogConfigs.QUERY_LOG_SAMPLE_RATE -> LogConfigs.DEFAULT_QUERY_LOG_SAMPLE_RATE ) var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2core/src/main/scala/org/apache/s2graph/core/S2GraphConfigs.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphConfigs.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphConfigs.scala new file mode 100644 index 0000000..445b9f6 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphConfigs.scala @@ -0,0 +1,103 @@ +package org.apache.s2graph.core + +object S2GraphConfigs { + + val S2GRAPH_STORE_BACKEND = "s2graph.storage.backend" + val DEFAULT_S2GRAPH_STORE_BACKEND = "hbase" + + val PHASE = "phase" + val DEFAULT_PHASE = "dev" + + object HBaseConfigs { + val HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum" + val DEFAULT_HBASE_ZOOKEEPER_QUORUM = "localhost" + + val HBASE_ZOOKEEPER_ZNODE_PARENT = "hbase.zookeeper.znode.parent" + val DEFAULT_HBASE_ZOOKEEPER_ZNODE_PARENT = "/hbase" + + val HBASE_TABLE_NAME = "hbase.table.name" + val DEFAULT_HBASE_TABLE_NAME = "s2graph" + + val HBASE_TABLE_COMPRESSION_ALGORITHM = "hbase.table.compression.algorithm" + val DEFAULT_HBASE_TABLE_COMPRESSION_ALGORITHM = "gz" + + val HBASE_CLIENT_RETRIES_NUMBER = "hbase.client.retries.number" + val DEFAULT_HBASE_CLIENT_RETRIES_NUMBER = java.lang.Integer.valueOf(20) + + val HBASE_RPCS_BUFFERED_FLUSH_INTERVAL = "hbase.rpcs.buffered_flush_interval" + val DEFAULT_HBASE_RPCS_BUFFERED_FLUSH_INTERVAL = java.lang.Short.valueOf(100.toShort) + + val HBASE_RPC_TIMEOUT = "hbase.rpc.timeout" + val DEFAULT_HBASE_RPC_TIMEOUT = java.lang.Integer.valueOf(600000) + } + object DBConfigs { + val DB_DEFAULT_DRIVER = "db.default.driver" + val DEFAULT_DB_DEFAULT_DRIVER = "org.h2.Driver" + + val DB_DEFAULT_URL = "db.default.url" + val DEFAULT_DB_DEFAULT_URL = "jdbc:h2:file:./var/metastore;MODE=MYSQL" + + val DB_DEFAULT_PASSWORD = "db.default.password" + val DEFAULT_DB_DEFAULT_PASSWORD = "graph" + + val DB_DEFAULT_USER = "db.default.user" + val DEFAULT_DB_DEFAULT_USER = "graph" + } + object CacheConfigs { + val CACHE_MAX_SIZE = "cache.max.size" + val DEFAULT_CACHE_MAX_SIZE = java.lang.Integer.valueOf(0) + + val CACHE_TTL_SECONDS = "cache.ttl.seconds" + val DEFAULT_CACHE_TTL_SECONDS = java.lang.Integer.valueOf(-1) + } + object ResourceCacheConfigs { + val RESOURCE_CACHE_MAX_SIZE = "resource.cache.max.size" + val DEFAULT_RESOURCE_CACHE_MAX_SIZE = java.lang.Integer.valueOf(1000) + + val RESOURCE_CACHE_TTL_SECONDS = "resource.cache.ttl.seconds" + val DEFAULT_RESOURCE_CACHE_TTL_SECONDS = java.lang.Integer.valueOf(-1) + } + object MutatorConfigs { + val MAX_RETRY_NUMBER = "max.retry.number" + val DEFAULT_MAX_RETRY_NUMBER = java.lang.Integer.valueOf(100) + + val LOCK_EXPIRE_TIME = "lock.expire.time" + val DEFAULT_LOCK_EXPIRE_TIME = java.lang.Integer.valueOf(1000 * 60 * 10) + + val MAX_BACK_OFF = "max.back.off" + val DEFAULT_MAX_BACK_OFF = java.lang.Integer.valueOf(100) + + val BACK_OFF_TIMEOUT = "back.off.timeout" + val DEFAULT_BACK_OFF_TIMEOUT = java.lang.Integer.valueOf(1000) + + val HBASE_FAIL_PROB = "hbase.fail.prob" + val DEFAULT_HBASE_FAIL_PROB = java.lang.Double.valueOf(-0.1) + + val DELETE_ALL_FETCH_SIZE = "delete.all.fetch.size" + val DEFAULT_DELETE_ALL_FETCH_SIZE = java.lang.Integer.valueOf(1000) + + val DELETE_ALL_FETCH_COUNT = "delete.all.fetch.count" + val DEFAULT_DELETE_ALL_FETCH_COUNT = java.lang.Integer.valueOf(200) + } + object QueryConfigs { + val QUERY_HARDLIMIT = "query.hardlimit" + val DEFAULT_QUERY_HARDLIMIT = java.lang.Integer.valueOf(100000) + } + object FutureCacheConfigs { + val FUTURE_CACHE_MAX_SIZE = "future.cache.max.size" + val DEFAULT_FUTURE_CACHE_MAX_SIZE = java.lang.Integer.valueOf(100000) + + val FUTURE_CACHE_EXPIRE_AFTER_WRITE = "future.cache.expire.after.write" + val DEFAULT_FUTURE_CACHE_EXPIRE_AFTER_WRITE = java.lang.Integer.valueOf(10000) + + val FUTURE_CACHE_EXPIRE_AFTER_ACCESS = "future.cache.expire.after.access" + val DEFAULT_FUTURE_CACHE_EXPIRE_AFTER_ACCESS = java.lang.Integer.valueOf(5000) + + val FUTURE_CACHE_METRIC_INTERVAL = "future.cache.metric.interval" + val DEFAULT_FUTURE_CACHE_METRIC_INTERVAL = java.lang.Integer.valueOf(60000) + } + object LogConfigs { + val QUERY_LOG_SAMPLE_RATE = "query.log.sample.rate" + val DEFAULT_QUERY_LOG_SAMPLE_RATE = Double.box(0.05) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala index b1b3bc2..1818d10 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala @@ -52,15 +52,19 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] { regionLocator.getStartKeys } - def toHBaseConfig(graphFileOptions: GraphFileOptions): Configuration = { + def toHBaseConfig(zkQuorum: String, tableName: String): Configuration = { val hbaseConf = HBaseConfiguration.create() - hbaseConf.set("hbase.zookeeper.quorum", graphFileOptions.zkQuorum) - hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, graphFileOptions.tableName) + hbaseConf.set("hbase.zookeeper.quorum", zkQuorum) + hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName) hbaseConf } + def toHBaseConfig(options: GraphFileOptions): Configuration = { + toHBaseConfig(options.zkQuorum, options.tableName) + } + def getStartKeys(numRegions: Int): Array[Array[Byte]] = { val startKey = AsynchbaseStorageManagement.getStartKey(numRegions) val endKey = AsynchbaseStorageManagement.getEndKey(numRegions) @@ -88,14 +92,28 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] { options: GraphFileOptions): Unit = { val hbaseConfig = toHBaseConfig(options) + generateHFile(sc, s2Config, kvs, hbaseConfig, options.tableName, + options.numRegions, options.output, options.incrementalLoad, options.compressionAlgorithm) + } + + def generateHFile(sc: SparkContext, + s2Config: Config, + kvs: RDD[KeyValue], + hbaseConfig: Configuration, + tableName: String, + numRegions: Int, + outputPath: String, + incrementalLoad: Boolean = false, + compressionAlgorithm: String = "lz4"): Unit = { + val table = TableName.valueOf(tableName) val startKeys = - if (options.incrementalLoad) { + if (incrementalLoad) { // need hbase connection to existing table to figure out the ranges of regions. - getTableStartKeys(hbaseConfig, TableName.valueOf(options.tableName)) + getTableStartKeys(hbaseConfig, table) } else { // otherwise we do not need to initialize Connection to hbase cluster. // only numRegions determine region's pre-split. - getStartKeys(numRegions = options.numRegions) + getStartKeys(numRegions = numRegions) } val hbaseSc = new HBaseContext(sc, hbaseConfig) @@ -106,21 +124,21 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] { Seq((k -> v)).toIterator } - val compressionAlgorithmClass = Algorithm.valueOf(options.compressionAlgorithm).getName.toUpperCase + val compressionAlgorithmClass = Algorithm.valueOf(compressionAlgorithm).getName.toUpperCase val familyOptions = new FamilyHFileWriteOptions(compressionAlgorithmClass, BloomType.ROW.name().toUpperCase, 32768, DataBlockEncoding.FAST_DIFF.name().toUpperCase) val familyOptionsMap = Map("e".getBytes("UTF-8") -> familyOptions, "v".getBytes("UTF-8") -> familyOptions) - hbaseSc.bulkLoad(kvs, TableName.valueOf(options.tableName), startKeys, flatMap, options.output, familyOptionsMap.asJava) + hbaseSc.bulkLoad(kvs, table, startKeys, flatMap, outputPath, familyOptionsMap.asJava) } override def generate(sc: SparkContext, config: Config, rdd: RDD[String], options: GraphFileOptions): Unit = { - val transformer = new SparkBulkLoaderTransformer(config, options) + val transformer = new SparkBulkLoaderTransformer(config, options.labelMapping, options.buildDegree) implicit val reader = new TsvBulkFormatReader implicit val writer = new KeyValueWriter(options.autoEdgeCreate, options.skipError) @@ -130,11 +148,14 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] { HFileGenerator.generateHFile(sc, config, kvs, options) } - def loadIncrementalHFiles(options: GraphFileOptions): Int = { + def loadIncrementalHFiles(inputPath: String, tableName: String): Int = { /* LoadIncrementHFiles */ - val hfileArgs = Array(options.output, options.tableName) val hbaseConfig = HBaseConfiguration.create() - ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) + ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), Array(inputPath, tableName)) + } + + def loadIncrementalHFiles(options: GraphFileOptions): Int = { + loadIncrementalHFiles(options.output, options.tableName) } def tableSnapshotDump(ss: SparkSession, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala index 9a2e81a..ff4c955 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala @@ -103,7 +103,7 @@ object HFileMRGenerator extends RawFileGenerator[String, KeyValue] { s2Config: Config, input: RDD[String], options: GraphFileOptions): RDD[KeyValue] = { - val transformer = new SparkBulkLoaderTransformer(s2Config, options) + val transformer = new SparkBulkLoaderTransformer(s2Config, options.labelMapping, options.buildDegree) implicit val reader = new TsvBulkFormatReader implicit val writer = new KeyValueWriter(options.autoEdgeCreate, options.skipError) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala index 68f9a0e..7f6f7c1 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala @@ -28,12 +28,13 @@ import scala.concurrent.ExecutionContext import scala.reflect.ClassTag class LocalBulkLoaderTransformer(val config: Config, - val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[Seq] { + val labelMapping: Map[String, String] = Map.empty, + val buildDegree: Boolean = false)(implicit ec: ExecutionContext) extends Transformer[Seq] { val s2: S2Graph = S2GraphHelper.getS2Graph(config) override def buildDegrees[T: ClassTag](elements: Seq[GraphElement])(implicit writer: GraphElementWritable[T]): Seq[T] = { val degrees = elements.flatMap { element => - DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L) + DegreeKey.fromGraphElement(s2, element, labelMapping).map(_ -> 1L) }.groupBy(_._1).mapValues(_.map(_._2).sum) degrees.toSeq.map { case (degreeKey, count) => @@ -44,7 +45,7 @@ class LocalBulkLoaderTransformer(val config: Config, override def transform[S: ClassTag, T: ClassTag](input: Seq[S])(implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): Seq[T] = { val elements = input.flatMap(reader.read(s2)(_)) val kvs = elements.map(writer.write(s2)(_)) - val degrees = if (options.buildDegree) buildDegrees[T](elements) else Nil + val degrees = if (buildDegree) buildDegrees[T](elements) else Nil kvs ++ degrees } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala index bf3d25c..b6292c4 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala @@ -31,8 +31,8 @@ class SparkBulkLoaderTransformer(val config: Config, val labelMapping: Map[String, String] = Map.empty, val buildDegree: Boolean = false) extends Transformer[RDD] { - def this(config: Config, options: GraphFileOptions) = - this(config, options.labelMapping, options.buildDegree) +// def this(config: Config, options: GraphFileOptions) = +// this(config, options.labelMapping, options.buildDegree) override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit writer: GraphElementWritable[T]): RDD[T] = { val degrees = elements.mapPartitions { iter => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index ae88b6d..8c88ee8 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -23,7 +23,7 @@ import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles import org.apache.hadoop.util.ToolRunner -import org.apache.s2graph.core.Management +import org.apache.s2graph.core.{GraphUtil, Management} import org.apache.s2graph.s2jobs.S2GraphHelper import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer} import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader @@ -34,7 +34,7 @@ import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger} import org.elasticsearch.spark.sql.EsSparkSQL import scala.collection.mutable.ListBuffer -import scala.concurrent.Await +import scala.concurrent.{Await, Future} import scala.concurrent.duration.Duration /** @@ -210,34 +210,55 @@ class ESSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) { * @param conf */ class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) { + import scala.collection.JavaConversions._ + import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._ + import org.apache.s2graph.core.S2GraphConfigs._ + override def mandatoryOptions: Set[String] = Set() override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider" private def writeBatchBulkload(df: DataFrame, runLoadIncrementalHFiles: Boolean = true): Unit = { - val options = TaskConf.toGraphFileOptions(conf) - val config = Management.toConfig(TaskConf.parseLocalCacheConfigs(conf) ++ - TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf) ++ options.toConfigParams) + val mergedOptions = conf.options ++ TaskConf.parseLocalCacheConfigs(conf) + val graphConfig: Config = ConfigFactory.parseMap(mergedOptions).withFallback(ConfigFactory.load()) + + // required for bulkload + val labelMapping = getConfigStringOpt(graphConfig, S2_SINK_BULKLOAD_LABEL_MAPPING).map(GraphUtil.toLabelMapping).getOrElse(Map.empty) + val buildDegree = getConfigStringOpt(graphConfig, S2_SINK_BULKLOAD_BUILD_DEGREE).map(_.toBoolean).getOrElse(false) + val autoEdgeCreate = getConfigStringOpt(graphConfig, S2_SINK_BULKLOAD_AUTO_EDGE_CREATE).map(_.toBoolean).getOrElse(false) + val skipError = getConfigStringOpt(graphConfig, S2_SINK_SKIP_ERROR).map(_.toBoolean).getOrElse(false) + + val zkQuorum = graphConfig.getString(HBaseConfigs.HBASE_ZOOKEEPER_QUORUM) + val tableName = graphConfig.getString(S2_SINK_BULKLOAD_HBASE_TABLE_NAME) + + val numRegions = graphConfig.getString(S2_SINK_BULKLOAD_HBASE_NUM_REGIONS).toInt + val outputPath = graphConfig.getString(S2_SINK_BULKLOAD_HBASE_TEMP_DIR) + + // optional. + val incrementalLoad = getConfigStringOpt(graphConfig, S2_SINK_BULKLOAD_HBASE_INCREMENTAL_LOAD).map(_.toBoolean).getOrElse(false) + val compressionAlgorithm = getConfigStringOpt(graphConfig, S2_SINK_BULKLOAD_HBASE_COMPRESSION).getOrElse("lz4") + + val hbaseConfig = HFileGenerator.toHBaseConfig(zkQuorum, tableName) val input = df.rdd - val transformer = new SparkBulkLoaderTransformer(config, options) + val transformer = new SparkBulkLoaderTransformer(graphConfig, labelMapping, buildDegree) implicit val reader = new RowBulkFormatReader - implicit val writer = new KeyValueWriter(options.autoEdgeCreate, options.skipError) + implicit val writer = new KeyValueWriter(autoEdgeCreate, skipError) val kvs = transformer.transform(input) - HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options) + HFileGenerator.generateHFile(df.sparkSession.sparkContext, graphConfig, + kvs.flatMap(ls => ls), hbaseConfig, tableName, + numRegions, outputPath, incrementalLoad, compressionAlgorithm + ) // finish bulk load by execute LoadIncrementHFile. - if (runLoadIncrementalHFiles) HFileGenerator.loadIncrementalHFiles(options) + if (runLoadIncrementalHFiles) HFileGenerator.loadIncrementalHFiles(outputPath, tableName) } private def writeBatchWithMutate(df:DataFrame):Unit = { - import scala.collection.JavaConversions._ - import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._ - // TODO: FIX THIS. overwrite local cache config. val mergedOptions = conf.options ++ TaskConf.parseLocalCacheConfigs(conf) val graphConfig: Config = ConfigFactory.parseMap(mergedOptions).withFallback(ConfigFactory.load()) @@ -247,6 +268,7 @@ class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con val groupedSize = getConfigString(graphConfig, S2_SINK_GROUPED_SIZE, DEFAULT_GROUPED_SIZE).toInt val waitTime = getConfigString(graphConfig, S2_SINK_WAIT_TIME, DEFAULT_WAIT_TIME_SECONDS).toInt + val skipError = getConfigStringOpt(graphConfig, S2_SINK_SKIP_ERROR).map(_.toBoolean).getOrElse(false) df.foreachPartition { iters => val config = ConfigFactory.parseString(serializedConfig) @@ -255,7 +277,15 @@ class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con val responses = iters.grouped(groupedSize).flatMap { rows => val elements = rows.flatMap(row => reader.read(s2Graph)(row)) - val mutateF = s2Graph.mutateElements(elements, true) + val mutateF = if (skipError) { + try { + s2Graph.mutateElements(elements, true) + } catch { + case e: Throwable => Future.successful(Nil) + } + } else { + s2Graph.mutateElements(elements, true) + } Await.result(mutateF, Duration(waitTime, "seconds")) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala index 6ef0111..d778478 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala @@ -24,14 +24,23 @@ import com.typesafe.config.Config import scala.util.Try object S2SinkConfigs { - val DB_DEFAULT_URL = "db.default.url" - val HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum" + // Common + + // meta storage Configurations. +// val DB_DEFAULT_DRIVER = "db.default.driver" +// val DB_DEFAULT_URL = "db.default.url" +// val DB_DEFAULT_PASSWORD = "db.default.password" +// val DB_DEFAULT_USER = "db.default.user" +// +// val HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum" val DEFAULT_GROUPED_SIZE = "100" val DEFAULT_WAIT_TIME_SECONDS = "5" val S2_SINK_PREFIX = "s2.spark.sql.streaming.sink" + val S2_SINK_BULKLOAD_PREFIX = "s2.spark.sql.bulkload.sink" + val S2_SINK_QUERY_NAME = s"$S2_SINK_PREFIX.queryname" val S2_SINK_LOG_PATH = s"$S2_SINK_PREFIX.logpath" val S2_SINK_CHECKPOINT_LOCATION = "checkpointlocation" @@ -40,6 +49,18 @@ object S2SinkConfigs { val S2_SINK_COMPACT_INTERVAL = s"$S2_SINK_PREFIX.compact.interval" val S2_SINK_GROUPED_SIZE = s"$S2_SINK_PREFIX.grouped.size" val S2_SINK_WAIT_TIME = s"$S2_SINK_PREFIX.wait.time" + val S2_SINK_SKIP_ERROR = s"$S2_SINK_PREFIX.skip.error" + + // BULKLOAD + val S2_SINK_BULKLOAD_HBASE_TABLE_NAME = s"$S2_SINK_BULKLOAD_PREFIX.hbase.table.name" + val S2_SINK_BULKLOAD_HBASE_NUM_REGIONS = s"$S2_SINK_BULKLOAD_PREFIX.hbase.table.num.regions" + val S2_SINK_BULKLOAD_HBASE_TEMP_DIR = s"$S2_SINK_BULKLOAD_PREFIX.hbase.temp.dir" + val S2_SINK_BULKLOAD_HBASE_INCREMENTAL_LOAD = s"$S2_SINK_BULKLOAD_PREFIX.hbase.incrementalLoad" + val S2_SINK_BULKLOAD_HBASE_COMPRESSION = s"$S2_SINK_BULKLOAD_PREFIX.hbase.compression" + + val S2_SINK_BULKLOAD_AUTO_EDGE_CREATE= s"$S2_SINK_BULKLOAD_PREFIX.auto.edge.create" + val S2_SINK_BULKLOAD_BUILD_DEGREE = s"$S2_SINK_BULKLOAD_PREFIX.build.degree" + val S2_SINK_BULKLOAD_LABEL_MAPPING = s"$S2_SINK_BULKLOAD_PREFIX.label.mapping" def getConfigStringOpt(config:Config, path:String): Option[String] = Try(config.getString(path)).toOption http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala index 1f93174..f97dce4 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala @@ -27,12 +27,18 @@ import org.apache.s2graph.core.schema.{Label, Service, ServiceColumn} import org.apache.s2graph.core.{Management, S2Graph} import org.apache.s2graph.core.types.HBaseType import org.apache.s2graph.s2jobs.loader.GraphFileOptions +import org.apache.s2graph.s2jobs.task.TaskConf +import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} import scala.util.Try class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase { - + import org.apache.s2graph.core.S2GraphConfigs._ + import S2SinkConfigs._ + protected val bulkloadOptions = new TaskConf("bulkloadOptions", "test", options = Map( + DBConfigs.DEFAULT_DB_DEFAULT_URL -> "jdbc:h2:file:./var/metastore_jobs;MODE=MYSQL" + )) protected val options = GraphFileOptions( input = "/tmp/test.txt", tempDir = "/tmp/bulkload_tmp", http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala index 872b3f4..17f087d 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala @@ -41,7 +41,7 @@ class GraphFileGeneratorTest extends BaseSparkTest { transformerMode match { case "spark" => val input: RDD[String] = sc.parallelize(edges) - val transformer = new SparkBulkLoaderTransformer(s2Config, options) + val transformer = new SparkBulkLoaderTransformer(s2Config, options.labelMapping, options.buildDegree) implicit val reader = new TsvBulkFormatReader implicit val writer = new KeyValueWriter @@ -55,7 +55,7 @@ class GraphFileGeneratorTest extends BaseSparkTest { case "local" => val input = edges - val transformer = new LocalBulkLoaderTransformer(s2Config, options) + val transformer = new LocalBulkLoaderTransformer(s2Config, options.labelMapping, options.buildDegree) implicit val reader = new TsvBulkFormatReader implicit val writer = new KeyValueWriter @@ -82,7 +82,7 @@ class GraphFileGeneratorTest extends BaseSparkTest { e.getDirection()) }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction").rdd - val transformer = new SparkBulkLoaderTransformer(s2Config, options) + val transformer = new SparkBulkLoaderTransformer(s2Config, options.labelMapping, options.buildDegree) implicit val reader = new RowBulkFormatReader implicit val writer = new KeyValueWriter
