Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 558264490 -> c30531bcd


initial commit.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e42f7b22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e42f7b22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e42f7b22

Branch: refs/heads/master
Commit: e42f7b22f26c7d725f9a33ff6badd7942768e478
Parents: 32eb344
Author: DO YUNG YOON <[email protected]>
Authored: Thu Jun 14 19:20:51 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Thu Jun 14 19:20:51 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/core/GraphUtil.scala     |   9 ++
 .../scala/org/apache/s2graph/core/S2Graph.scala |  65 ++++++------
 .../apache/s2graph/core/S2GraphConfigs.scala    | 103 +++++++++++++++++++
 .../s2graph/s2jobs/loader/HFileGenerator.scala  |  45 +++++---
 .../s2jobs/loader/HFileMRGenerator.scala        |   2 +-
 .../loader/LocalBulkLoaderTransformer.scala     |   7 +-
 .../loader/SparkBulkLoaderTransformer.scala     |   4 +-
 .../org/apache/s2graph/s2jobs/task/Sink.scala   |  56 +++++++---
 .../spark/sql/streaming/S2SinkConfigs.scala     |  25 ++++-
 .../apache/s2graph/s2jobs/BaseSparkTest.scala   |   8 +-
 .../s2jobs/loader/GraphFileGeneratorTest.scala  |   6 +-
 11 files changed, 259 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
index e08bb4e..7f74032 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
@@ -171,4 +171,13 @@ object GraphUtil {
   def stringToOption(s: String): Option[String] = {
     Option(s).filter(_.trim.nonEmpty)
   }
+
+  def toLabelMapping(lableMapping: String): Map[String, String] = {
+    (for {
+      token <- lableMapping.split(",")
+      inner = token.split(":") if inner.length == 2
+    } yield {
+      (inner.head, inner.last)
+    }).toMap
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index d41bc24..dabbf80 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -51,42 +51,39 @@ object S2Graph {
   val DefaultScore = 1.0
   val FetchAllLimit = 10000000
   val DefaultFetchLimit = 1000
+  import S2GraphConfigs._
 
   private val DefaultConfigs: Map[String, AnyRef] = Map(
-    "hbase.zookeeper.quorum" -> "localhost",
-    "hbase.table.name" -> "s2graph",
-    "hbase.table.compression.algorithm" -> "gz",
-    "phase" -> "dev",
-    "db.default.driver" -> "org.h2.Driver",
-    "db.default.url" -> "jdbc:h2:file:./var/metastore;MODE=MYSQL",
-    "db.default.password" -> "graph",
-    "db.default.user" -> "graph",
-    "cache.max.size" -> java.lang.Integer.valueOf(0),
-    "cache.ttl.seconds" -> java.lang.Integer.valueOf(-1),
-    "resource.cache.max.size" -> java.lang.Integer.valueOf(1000),
-    "resource.cache.ttl.seconds" -> java.lang.Integer.valueOf(-1),
-    "hbase.client.retries.number" -> java.lang.Integer.valueOf(20),
-    "hbase.rpcs.buffered_flush_interval" -> 
java.lang.Short.valueOf(100.toShort),
-    "hbase.rpc.timeout" -> java.lang.Integer.valueOf(600000),
-    "max.retry.number" -> java.lang.Integer.valueOf(100),
-    "lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10),
-    "max.back.off" -> java.lang.Integer.valueOf(100),
-    "back.off.timeout" -> java.lang.Integer.valueOf(1000),
-    "hbase.fail.prob" -> java.lang.Double.valueOf(-0.1),
-    "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000),
-    "delete.all.fetch.count" -> java.lang.Integer.valueOf(200),
-    "future.cache.max.size" -> java.lang.Integer.valueOf(100000),
-    "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
-    "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
-    "future.cache.metric.interval" -> java.lang.Integer.valueOf(60000),
-    "query.future.cache.max.size" -> java.lang.Integer.valueOf(1000),
-    "query.future.cache.expire.after.write" -> 
java.lang.Integer.valueOf(10000),
-    "query.future.cache.expire.after.access" -> 
java.lang.Integer.valueOf(5000),
-    "query.future.cache.metric.interval" -> java.lang.Integer.valueOf(60000),
-    "s2graph.storage.backend" -> "hbase",
-    "query.hardlimit" -> java.lang.Integer.valueOf(100000),
-    "hbase.zookeeper.znode.parent" -> "/hbase",
-    "query.log.sample.rate" -> Double.box(0.05)
+    S2GRAPH_STORE_BACKEND -> DEFAULT_S2GRAPH_STORE_BACKEND,
+    PHASE -> DEFAULT_PHASE,
+    HBaseConfigs.HBASE_ZOOKEEPER_QUORUM -> 
HBaseConfigs.DEFAULT_HBASE_ZOOKEEPER_QUORUM,
+    HBaseConfigs.HBASE_ZOOKEEPER_ZNODE_PARENT -> 
HBaseConfigs.DEFAULT_HBASE_ZOOKEEPER_ZNODE_PARENT,
+    HBaseConfigs.HBASE_TABLE_NAME -> HBaseConfigs.DEFAULT_HBASE_TABLE_NAME,
+    HBaseConfigs.HBASE_TABLE_COMPRESSION_ALGORITHM -> 
HBaseConfigs.DEFAULT_HBASE_TABLE_COMPRESSION_ALGORITHM,
+    HBaseConfigs.HBASE_CLIENT_RETRIES_NUMBER -> 
HBaseConfigs.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER,
+    HBaseConfigs.HBASE_RPCS_BUFFERED_FLUSH_INTERVAL -> 
HBaseConfigs.DEFAULT_HBASE_RPCS_BUFFERED_FLUSH_INTERVAL,
+    HBaseConfigs.HBASE_RPC_TIMEOUT -> HBaseConfigs.DEFAULT_HBASE_RPC_TIMEOUT,
+    DBConfigs.DB_DEFAULT_DRIVER -> DBConfigs.DEFAULT_DB_DEFAULT_DRIVER,
+    DBConfigs.DB_DEFAULT_URL -> DBConfigs.DEFAULT_DB_DEFAULT_URL,
+    DBConfigs.DB_DEFAULT_PASSWORD -> DBConfigs.DEFAULT_DB_DEFAULT_PASSWORD,
+    DBConfigs.DB_DEFAULT_USER -> DBConfigs.DEFAULT_DB_DEFAULT_USER,
+    CacheConfigs.CACHE_MAX_SIZE -> CacheConfigs.DEFAULT_CACHE_MAX_SIZE,
+    CacheConfigs.CACHE_TTL_SECONDS -> CacheConfigs.DEFAULT_CACHE_TTL_SECONDS,
+    ResourceCacheConfigs.RESOURCE_CACHE_MAX_SIZE -> 
ResourceCacheConfigs.DEFAULT_RESOURCE_CACHE_MAX_SIZE,
+    ResourceCacheConfigs.RESOURCE_CACHE_TTL_SECONDS -> 
ResourceCacheConfigs.DEFAULT_RESOURCE_CACHE_TTL_SECONDS,
+    MutatorConfigs.MAX_RETRY_NUMBER -> MutatorConfigs.DEFAULT_MAX_RETRY_NUMBER,
+    MutatorConfigs.LOCK_EXPIRE_TIME -> MutatorConfigs.DEFAULT_LOCK_EXPIRE_TIME,
+    MutatorConfigs.MAX_BACK_OFF -> MutatorConfigs.DEFAULT_MAX_BACK_OFF,
+    MutatorConfigs.BACK_OFF_TIMEOUT -> MutatorConfigs.DEFAULT_BACK_OFF_TIMEOUT,
+    MutatorConfigs.HBASE_FAIL_PROB -> MutatorConfigs.DEFAULT_HBASE_FAIL_PROB,
+    MutatorConfigs.DELETE_ALL_FETCH_SIZE -> 
MutatorConfigs.DEFAULT_DELETE_ALL_FETCH_SIZE,
+    MutatorConfigs.DELETE_ALL_FETCH_COUNT -> 
MutatorConfigs.DEFAULT_DELETE_ALL_FETCH_COUNT,
+    FutureCacheConfigs.FUTURE_CACHE_MAX_SIZE -> 
FutureCacheConfigs.DEFAULT_FUTURE_CACHE_MAX_SIZE,
+    FutureCacheConfigs.FUTURE_CACHE_EXPIRE_AFTER_WRITE -> 
FutureCacheConfigs.DEFAULT_FUTURE_CACHE_EXPIRE_AFTER_WRITE,
+    FutureCacheConfigs.FUTURE_CACHE_EXPIRE_AFTER_ACCESS -> 
FutureCacheConfigs.DEFAULT_FUTURE_CACHE_EXPIRE_AFTER_ACCESS,
+    FutureCacheConfigs.FUTURE_CACHE_METRIC_INTERVAL -> 
FutureCacheConfigs.DEFAULT_FUTURE_CACHE_METRIC_INTERVAL,
+    QueryConfigs.QUERY_HARDLIMIT -> QueryConfigs.DEFAULT_QUERY_HARDLIMIT,
+    LogConfigs.QUERY_LOG_SAMPLE_RATE -> 
LogConfigs.DEFAULT_QUERY_LOG_SAMPLE_RATE
   )
 
   var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2core/src/main/scala/org/apache/s2graph/core/S2GraphConfigs.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphConfigs.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphConfigs.scala
new file mode 100644
index 0000000..445b9f6
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphConfigs.scala
@@ -0,0 +1,103 @@
+package org.apache.s2graph.core
+
+object S2GraphConfigs {
+
+  val S2GRAPH_STORE_BACKEND = "s2graph.storage.backend"
+  val DEFAULT_S2GRAPH_STORE_BACKEND = "hbase"
+
+  val PHASE = "phase"
+  val DEFAULT_PHASE = "dev"
+
+  object HBaseConfigs {
+    val HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"
+    val DEFAULT_HBASE_ZOOKEEPER_QUORUM = "localhost"
+
+    val HBASE_ZOOKEEPER_ZNODE_PARENT = "hbase.zookeeper.znode.parent"
+    val DEFAULT_HBASE_ZOOKEEPER_ZNODE_PARENT = "/hbase"
+
+    val HBASE_TABLE_NAME = "hbase.table.name"
+    val DEFAULT_HBASE_TABLE_NAME = "s2graph"
+
+    val HBASE_TABLE_COMPRESSION_ALGORITHM = "hbase.table.compression.algorithm"
+    val DEFAULT_HBASE_TABLE_COMPRESSION_ALGORITHM = "gz"
+
+    val HBASE_CLIENT_RETRIES_NUMBER = "hbase.client.retries.number"
+    val DEFAULT_HBASE_CLIENT_RETRIES_NUMBER = java.lang.Integer.valueOf(20)
+
+    val HBASE_RPCS_BUFFERED_FLUSH_INTERVAL = 
"hbase.rpcs.buffered_flush_interval"
+    val DEFAULT_HBASE_RPCS_BUFFERED_FLUSH_INTERVAL = 
java.lang.Short.valueOf(100.toShort)
+
+    val HBASE_RPC_TIMEOUT = "hbase.rpc.timeout"
+    val DEFAULT_HBASE_RPC_TIMEOUT = java.lang.Integer.valueOf(600000)
+  }
+  object DBConfigs {
+    val DB_DEFAULT_DRIVER = "db.default.driver"
+    val DEFAULT_DB_DEFAULT_DRIVER = "org.h2.Driver"
+
+    val DB_DEFAULT_URL = "db.default.url"
+    val DEFAULT_DB_DEFAULT_URL = "jdbc:h2:file:./var/metastore;MODE=MYSQL"
+
+    val DB_DEFAULT_PASSWORD = "db.default.password"
+    val DEFAULT_DB_DEFAULT_PASSWORD = "graph"
+
+    val DB_DEFAULT_USER = "db.default.user"
+    val DEFAULT_DB_DEFAULT_USER = "graph"
+  }
+  object CacheConfigs {
+    val CACHE_MAX_SIZE = "cache.max.size"
+    val DEFAULT_CACHE_MAX_SIZE = java.lang.Integer.valueOf(0)
+
+    val CACHE_TTL_SECONDS = "cache.ttl.seconds"
+    val DEFAULT_CACHE_TTL_SECONDS = java.lang.Integer.valueOf(-1)
+  }
+  object ResourceCacheConfigs {
+    val RESOURCE_CACHE_MAX_SIZE = "resource.cache.max.size"
+    val DEFAULT_RESOURCE_CACHE_MAX_SIZE = java.lang.Integer.valueOf(1000)
+
+    val RESOURCE_CACHE_TTL_SECONDS = "resource.cache.ttl.seconds"
+    val DEFAULT_RESOURCE_CACHE_TTL_SECONDS = java.lang.Integer.valueOf(-1)
+  }
+  object MutatorConfigs {
+    val MAX_RETRY_NUMBER = "max.retry.number"
+    val DEFAULT_MAX_RETRY_NUMBER = java.lang.Integer.valueOf(100)
+
+    val LOCK_EXPIRE_TIME = "lock.expire.time"
+    val DEFAULT_LOCK_EXPIRE_TIME = java.lang.Integer.valueOf(1000 * 60 * 10)
+
+    val MAX_BACK_OFF = "max.back.off"
+    val DEFAULT_MAX_BACK_OFF = java.lang.Integer.valueOf(100)
+
+    val BACK_OFF_TIMEOUT = "back.off.timeout"
+    val DEFAULT_BACK_OFF_TIMEOUT = java.lang.Integer.valueOf(1000)
+
+    val HBASE_FAIL_PROB = "hbase.fail.prob"
+    val DEFAULT_HBASE_FAIL_PROB = java.lang.Double.valueOf(-0.1)
+
+    val DELETE_ALL_FETCH_SIZE = "delete.all.fetch.size"
+    val DEFAULT_DELETE_ALL_FETCH_SIZE = java.lang.Integer.valueOf(1000)
+
+    val DELETE_ALL_FETCH_COUNT = "delete.all.fetch.count"
+    val DEFAULT_DELETE_ALL_FETCH_COUNT = java.lang.Integer.valueOf(200)
+  }
+  object QueryConfigs {
+    val QUERY_HARDLIMIT = "query.hardlimit"
+    val DEFAULT_QUERY_HARDLIMIT = java.lang.Integer.valueOf(100000)
+  }
+  object FutureCacheConfigs {
+    val FUTURE_CACHE_MAX_SIZE = "future.cache.max.size"
+    val DEFAULT_FUTURE_CACHE_MAX_SIZE = java.lang.Integer.valueOf(100000)
+
+    val FUTURE_CACHE_EXPIRE_AFTER_WRITE = "future.cache.expire.after.write"
+    val DEFAULT_FUTURE_CACHE_EXPIRE_AFTER_WRITE = 
java.lang.Integer.valueOf(10000)
+
+    val FUTURE_CACHE_EXPIRE_AFTER_ACCESS = "future.cache.expire.after.access"
+    val DEFAULT_FUTURE_CACHE_EXPIRE_AFTER_ACCESS = 
java.lang.Integer.valueOf(5000)
+
+    val FUTURE_CACHE_METRIC_INTERVAL = "future.cache.metric.interval"
+    val DEFAULT_FUTURE_CACHE_METRIC_INTERVAL = java.lang.Integer.valueOf(60000)
+  }
+  object LogConfigs {
+    val QUERY_LOG_SAMPLE_RATE = "query.log.sample.rate"
+    val DEFAULT_QUERY_LOG_SAMPLE_RATE = Double.box(0.05)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index b1b3bc2..1818d10 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -52,15 +52,19 @@ object HFileGenerator extends RawFileGenerator[String, 
KeyValue] {
     regionLocator.getStartKeys
   }
 
-  def toHBaseConfig(graphFileOptions: GraphFileOptions): Configuration = {
+  def toHBaseConfig(zkQuorum: String, tableName: String): Configuration = {
     val hbaseConf = HBaseConfiguration.create()
 
-    hbaseConf.set("hbase.zookeeper.quorum", graphFileOptions.zkQuorum)
-    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, graphFileOptions.tableName)
+    hbaseConf.set("hbase.zookeeper.quorum", zkQuorum)
+    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
 
     hbaseConf
   }
 
+  def toHBaseConfig(options: GraphFileOptions): Configuration = {
+    toHBaseConfig(options.zkQuorum, options.tableName)
+  }
+
   def getStartKeys(numRegions: Int): Array[Array[Byte]] = {
     val startKey = AsynchbaseStorageManagement.getStartKey(numRegions)
     val endKey = AsynchbaseStorageManagement.getEndKey(numRegions)
@@ -88,14 +92,28 @@ object HFileGenerator extends RawFileGenerator[String, 
KeyValue] {
                     options: GraphFileOptions): Unit = {
     val hbaseConfig = toHBaseConfig(options)
 
+    generateHFile(sc, s2Config, kvs, hbaseConfig, options.tableName,
+      options.numRegions, options.output, options.incrementalLoad, 
options.compressionAlgorithm)
+  }
+
+  def generateHFile(sc: SparkContext,
+                    s2Config: Config,
+                    kvs: RDD[KeyValue],
+                    hbaseConfig: Configuration,
+                    tableName: String,
+                    numRegions: Int,
+                    outputPath: String,
+                    incrementalLoad: Boolean = false,
+                    compressionAlgorithm: String = "lz4"): Unit = {
+    val table = TableName.valueOf(tableName)
     val startKeys =
-      if (options.incrementalLoad) {
+      if (incrementalLoad) {
         // need hbase connection to existing table to figure out the ranges of 
regions.
-        getTableStartKeys(hbaseConfig, TableName.valueOf(options.tableName))
+        getTableStartKeys(hbaseConfig, table)
       } else {
         // otherwise we do not need to initialize Connection to hbase cluster.
         // only numRegions determine region's pre-split.
-        getStartKeys(numRegions = options.numRegions)
+        getStartKeys(numRegions = numRegions)
       }
 
     val hbaseSc = new HBaseContext(sc, hbaseConfig)
@@ -106,21 +124,21 @@ object HFileGenerator extends RawFileGenerator[String, 
KeyValue] {
       Seq((k -> v)).toIterator
     }
 
-    val compressionAlgorithmClass = 
Algorithm.valueOf(options.compressionAlgorithm).getName.toUpperCase
+    val compressionAlgorithmClass = 
Algorithm.valueOf(compressionAlgorithm).getName.toUpperCase
     val familyOptions = new FamilyHFileWriteOptions(compressionAlgorithmClass,
       BloomType.ROW.name().toUpperCase, 32768, 
DataBlockEncoding.FAST_DIFF.name().toUpperCase)
 
     val familyOptionsMap = Map("e".getBytes("UTF-8") -> familyOptions, 
"v".getBytes("UTF-8") -> familyOptions)
 
 
-    hbaseSc.bulkLoad(kvs, TableName.valueOf(options.tableName), startKeys, 
flatMap, options.output, familyOptionsMap.asJava)
+    hbaseSc.bulkLoad(kvs, table, startKeys, flatMap, outputPath, 
familyOptionsMap.asJava)
   }
 
   override def generate(sc: SparkContext,
                         config: Config,
                         rdd: RDD[String],
                         options: GraphFileOptions): Unit = {
-    val transformer = new SparkBulkLoaderTransformer(config, options)
+    val transformer = new SparkBulkLoaderTransformer(config, 
options.labelMapping, options.buildDegree)
 
     implicit val reader = new TsvBulkFormatReader
     implicit val writer = new KeyValueWriter(options.autoEdgeCreate, 
options.skipError)
@@ -130,11 +148,14 @@ object HFileGenerator extends RawFileGenerator[String, 
KeyValue] {
     HFileGenerator.generateHFile(sc, config, kvs, options)
   }
 
-  def loadIncrementalHFiles(options: GraphFileOptions): Int = {
+  def loadIncrementalHFiles(inputPath: String, tableName: String): Int = {
     /* LoadIncrementHFiles */
-    val hfileArgs = Array(options.output, options.tableName)
     val hbaseConfig = HBaseConfiguration.create()
-    ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), 
hfileArgs)
+    ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), 
Array(inputPath, tableName))
+  }
+
+  def loadIncrementalHFiles(options: GraphFileOptions): Int = {
+    loadIncrementalHFiles(options.output, options.tableName)
   }
 
   def tableSnapshotDump(ss: SparkSession,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
index 9a2e81a..ff4c955 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
@@ -103,7 +103,7 @@ object HFileMRGenerator extends RawFileGenerator[String, 
KeyValue] {
                s2Config: Config,
                input: RDD[String],
                options: GraphFileOptions): RDD[KeyValue] = {
-    val transformer = new SparkBulkLoaderTransformer(s2Config, options)
+    val transformer = new SparkBulkLoaderTransformer(s2Config, 
options.labelMapping, options.buildDegree)
 
     implicit val reader = new TsvBulkFormatReader
     implicit val writer = new KeyValueWriter(options.autoEdgeCreate, 
options.skipError)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
index 68f9a0e..7f6f7c1 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
@@ -28,12 +28,13 @@ import scala.concurrent.ExecutionContext
 import scala.reflect.ClassTag
 
 class LocalBulkLoaderTransformer(val config: Config,
-                                 val options: GraphFileOptions)(implicit ec: 
ExecutionContext) extends Transformer[Seq] {
+                                 val labelMapping: Map[String, String] = 
Map.empty,
+                                 val buildDegree: Boolean = false)(implicit 
ec: ExecutionContext) extends Transformer[Seq] {
   val s2: S2Graph = S2GraphHelper.getS2Graph(config)
 
   override def buildDegrees[T: ClassTag](elements: Seq[GraphElement])(implicit 
writer: GraphElementWritable[T]): Seq[T] = {
     val degrees = elements.flatMap { element =>
-      DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 
1L)
+      DegreeKey.fromGraphElement(s2, element, labelMapping).map(_ -> 1L)
     }.groupBy(_._1).mapValues(_.map(_._2).sum)
 
     degrees.toSeq.map { case (degreeKey, count) =>
@@ -44,7 +45,7 @@ class LocalBulkLoaderTransformer(val config: Config,
   override def transform[S: ClassTag, T: ClassTag](input: Seq[S])(implicit 
reader: GraphElementReadable[S], writer: GraphElementWritable[T]): Seq[T] = {
     val elements = input.flatMap(reader.read(s2)(_))
     val kvs = elements.map(writer.write(s2)(_))
-    val degrees = if (options.buildDegree) buildDegrees[T](elements) else Nil
+    val degrees = if (buildDegree) buildDegrees[T](elements) else Nil
 
     kvs ++ degrees
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
index bf3d25c..b6292c4 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
@@ -31,8 +31,8 @@ class SparkBulkLoaderTransformer(val config: Config,
                                  val labelMapping: Map[String, String] = 
Map.empty,
                                  val buildDegree: Boolean = false) extends 
Transformer[RDD] {
 
-  def this(config: Config, options: GraphFileOptions) =
-    this(config, options.labelMapping, options.buildDegree)
+//  def this(config: Config, options: GraphFileOptions) =
+//    this(config, options.labelMapping, options.buildDegree)
 
   override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit 
writer: GraphElementWritable[T]): RDD[T] = {
     val degrees = elements.mapPartitions { iter =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index ae88b6d..8c88ee8 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -23,7 +23,7 @@ import com.typesafe.config.{Config, ConfigFactory, 
ConfigRenderOptions}
 import org.apache.hadoop.hbase.HBaseConfiguration
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
 import org.apache.hadoop.util.ToolRunner
-import org.apache.s2graph.core.Management
+import org.apache.s2graph.core.{GraphUtil, Management}
 import org.apache.s2graph.s2jobs.S2GraphHelper
 import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, 
SparkBulkLoaderTransformer}
 import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
@@ -34,7 +34,7 @@ import org.apache.spark.sql.streaming.{DataStreamWriter, 
OutputMode, Trigger}
 import org.elasticsearch.spark.sql.EsSparkSQL
 
 import scala.collection.mutable.ListBuffer
-import scala.concurrent.Await
+import scala.concurrent.{Await, Future}
 import scala.concurrent.duration.Duration
 
 /**
@@ -210,34 +210,55 @@ class ESSink(queryName: String, conf: TaskConf) extends 
Sink(queryName, conf) {
   * @param conf
   */
 class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, 
conf) {
+  import scala.collection.JavaConversions._
+  import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
+  import org.apache.s2graph.core.S2GraphConfigs._
+
   override def mandatoryOptions: Set[String] = Set()
 
   override val FORMAT: String = 
"org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
 
   private def writeBatchBulkload(df: DataFrame, runLoadIncrementalHFiles: 
Boolean = true): Unit = {
-    val options = TaskConf.toGraphFileOptions(conf)
-    val config = Management.toConfig(TaskConf.parseLocalCacheConfigs(conf) ++
-      TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf) 
++ options.toConfigParams)
+    val mergedOptions = conf.options ++ TaskConf.parseLocalCacheConfigs(conf)
+    val graphConfig: Config = 
ConfigFactory.parseMap(mergedOptions).withFallback(ConfigFactory.load())
+
+    // required for bulkload
+    val labelMapping = getConfigStringOpt(graphConfig, 
S2_SINK_BULKLOAD_LABEL_MAPPING).map(GraphUtil.toLabelMapping).getOrElse(Map.empty)
+    val buildDegree = getConfigStringOpt(graphConfig, 
S2_SINK_BULKLOAD_BUILD_DEGREE).map(_.toBoolean).getOrElse(false)
+    val autoEdgeCreate = getConfigStringOpt(graphConfig, 
S2_SINK_BULKLOAD_AUTO_EDGE_CREATE).map(_.toBoolean).getOrElse(false)
+    val skipError = getConfigStringOpt(graphConfig, 
S2_SINK_SKIP_ERROR).map(_.toBoolean).getOrElse(false)
+
+    val zkQuorum = graphConfig.getString(HBaseConfigs.HBASE_ZOOKEEPER_QUORUM)
+    val tableName = graphConfig.getString(S2_SINK_BULKLOAD_HBASE_TABLE_NAME)
+
+    val numRegions = 
graphConfig.getString(S2_SINK_BULKLOAD_HBASE_NUM_REGIONS).toInt
+    val outputPath = graphConfig.getString(S2_SINK_BULKLOAD_HBASE_TEMP_DIR)
+
+    // optional.
+    val incrementalLoad = getConfigStringOpt(graphConfig, 
S2_SINK_BULKLOAD_HBASE_INCREMENTAL_LOAD).map(_.toBoolean).getOrElse(false)
+    val compressionAlgorithm = getConfigStringOpt(graphConfig, 
S2_SINK_BULKLOAD_HBASE_COMPRESSION).getOrElse("lz4")
+
+    val hbaseConfig = HFileGenerator.toHBaseConfig(zkQuorum, tableName)
 
     val input = df.rdd
 
-    val transformer = new SparkBulkLoaderTransformer(config, options)
+    val transformer = new SparkBulkLoaderTransformer(graphConfig, 
labelMapping, buildDegree)
 
     implicit val reader = new RowBulkFormatReader
-    implicit val writer = new KeyValueWriter(options.autoEdgeCreate, 
options.skipError)
+    implicit val writer = new KeyValueWriter(autoEdgeCreate, skipError)
 
     val kvs = transformer.transform(input)
 
-    HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, 
kvs.flatMap(ls => ls), options)
+    HFileGenerator.generateHFile(df.sparkSession.sparkContext, graphConfig,
+      kvs.flatMap(ls => ls), hbaseConfig, tableName,
+      numRegions, outputPath, incrementalLoad, compressionAlgorithm
+    )
 
     // finish bulk load by execute LoadIncrementHFile.
-    if (runLoadIncrementalHFiles) HFileGenerator.loadIncrementalHFiles(options)
+    if (runLoadIncrementalHFiles) 
HFileGenerator.loadIncrementalHFiles(outputPath, tableName)
   }
 
   private def writeBatchWithMutate(df:DataFrame):Unit = {
-    import scala.collection.JavaConversions._
-    import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
-
     // TODO: FIX THIS. overwrite local cache config.
     val mergedOptions = conf.options ++ TaskConf.parseLocalCacheConfigs(conf)
     val graphConfig: Config = 
ConfigFactory.parseMap(mergedOptions).withFallback(ConfigFactory.load())
@@ -247,6 +268,7 @@ class S2GraphSink(queryName: String, conf: TaskConf) 
extends Sink(queryName, con
 
     val groupedSize = getConfigString(graphConfig, S2_SINK_GROUPED_SIZE, 
DEFAULT_GROUPED_SIZE).toInt
     val waitTime = getConfigString(graphConfig, S2_SINK_WAIT_TIME, 
DEFAULT_WAIT_TIME_SECONDS).toInt
+    val skipError = getConfigStringOpt(graphConfig, 
S2_SINK_SKIP_ERROR).map(_.toBoolean).getOrElse(false)
 
     df.foreachPartition { iters =>
       val config = ConfigFactory.parseString(serializedConfig)
@@ -255,7 +277,15 @@ class S2GraphSink(queryName: String, conf: TaskConf) 
extends Sink(queryName, con
       val responses = iters.grouped(groupedSize).flatMap { rows =>
         val elements = rows.flatMap(row => reader.read(s2Graph)(row))
 
-        val mutateF = s2Graph.mutateElements(elements, true)
+        val mutateF = if (skipError) {
+          try {
+            s2Graph.mutateElements(elements, true)
+          } catch {
+            case e: Throwable => Future.successful(Nil)
+          }
+        } else {
+          s2Graph.mutateElements(elements, true)
+        }
         Await.result(mutateF, Duration(waitTime, "seconds"))
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala
index 6ef0111..d778478 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala
@@ -24,14 +24,23 @@ import com.typesafe.config.Config
 import scala.util.Try
 
 object S2SinkConfigs {
-  val DB_DEFAULT_URL = "db.default.url"
-  val HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"
+  // Common
+
+  // meta storage Configurations.
+//  val DB_DEFAULT_DRIVER = "db.default.driver"
+//  val DB_DEFAULT_URL = "db.default.url"
+//  val DB_DEFAULT_PASSWORD = "db.default.password"
+//  val DB_DEFAULT_USER = "db.default.user"
+//
+//  val HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"
 
 
   val DEFAULT_GROUPED_SIZE = "100"
   val DEFAULT_WAIT_TIME_SECONDS = "5"
 
   val S2_SINK_PREFIX = "s2.spark.sql.streaming.sink"
+  val S2_SINK_BULKLOAD_PREFIX = "s2.spark.sql.bulkload.sink"
+
   val S2_SINK_QUERY_NAME = s"$S2_SINK_PREFIX.queryname"
   val S2_SINK_LOG_PATH = s"$S2_SINK_PREFIX.logpath"
   val S2_SINK_CHECKPOINT_LOCATION = "checkpointlocation"
@@ -40,6 +49,18 @@ object S2SinkConfigs {
   val S2_SINK_COMPACT_INTERVAL = s"$S2_SINK_PREFIX.compact.interval"
   val S2_SINK_GROUPED_SIZE = s"$S2_SINK_PREFIX.grouped.size"
   val S2_SINK_WAIT_TIME = s"$S2_SINK_PREFIX.wait.time"
+  val S2_SINK_SKIP_ERROR = s"$S2_SINK_PREFIX.skip.error"
+
+  // BULKLOAD
+  val S2_SINK_BULKLOAD_HBASE_TABLE_NAME = 
s"$S2_SINK_BULKLOAD_PREFIX.hbase.table.name"
+  val S2_SINK_BULKLOAD_HBASE_NUM_REGIONS = 
s"$S2_SINK_BULKLOAD_PREFIX.hbase.table.num.regions"
+  val S2_SINK_BULKLOAD_HBASE_TEMP_DIR = 
s"$S2_SINK_BULKLOAD_PREFIX.hbase.temp.dir"
+  val S2_SINK_BULKLOAD_HBASE_INCREMENTAL_LOAD = 
s"$S2_SINK_BULKLOAD_PREFIX.hbase.incrementalLoad"
+  val S2_SINK_BULKLOAD_HBASE_COMPRESSION = 
s"$S2_SINK_BULKLOAD_PREFIX.hbase.compression"
+
+  val S2_SINK_BULKLOAD_AUTO_EDGE_CREATE= 
s"$S2_SINK_BULKLOAD_PREFIX.auto.edge.create"
+  val S2_SINK_BULKLOAD_BUILD_DEGREE = s"$S2_SINK_BULKLOAD_PREFIX.build.degree"
+  val S2_SINK_BULKLOAD_LABEL_MAPPING = 
s"$S2_SINK_BULKLOAD_PREFIX.label.mapping"
 
   def getConfigStringOpt(config:Config, path:String): Option[String] = 
Try(config.getString(path)).toOption
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
index 1f93174..f97dce4 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@ -27,12 +27,18 @@ import org.apache.s2graph.core.schema.{Label, Service, 
ServiceColumn}
 import org.apache.s2graph.core.{Management, S2Graph}
 import org.apache.s2graph.core.types.HBaseType
 import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
 import scala.util.Try
 
 class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with 
DataFrameSuiteBase {
-
+  import org.apache.s2graph.core.S2GraphConfigs._
+  import S2SinkConfigs._
+  protected val bulkloadOptions = new TaskConf("bulkloadOptions", "test", 
options = Map(
+    DBConfigs.DEFAULT_DB_DEFAULT_URL -> 
"jdbc:h2:file:./var/metastore_jobs;MODE=MYSQL"
+  ))
   protected val options = GraphFileOptions(
     input = "/tmp/test.txt",
     tempDir = "/tmp/bulkload_tmp",

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e42f7b22/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
index 872b3f4..17f087d 100644
--- 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
+++ 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
@@ -41,7 +41,7 @@ class GraphFileGeneratorTest extends BaseSparkTest {
     transformerMode match {
       case "spark" =>
         val input: RDD[String] = sc.parallelize(edges)
-        val transformer = new SparkBulkLoaderTransformer(s2Config, options)
+        val transformer = new SparkBulkLoaderTransformer(s2Config, 
options.labelMapping, options.buildDegree)
 
         implicit val reader = new TsvBulkFormatReader
         implicit val writer = new KeyValueWriter
@@ -55,7 +55,7 @@ class GraphFileGeneratorTest extends BaseSparkTest {
 
       case "local" =>
         val input = edges
-        val transformer = new LocalBulkLoaderTransformer(s2Config, options)
+        val transformer = new LocalBulkLoaderTransformer(s2Config, 
options.labelMapping, options.buildDegree)
 
         implicit val reader = new TsvBulkFormatReader
         implicit val writer = new KeyValueWriter
@@ -82,7 +82,7 @@ class GraphFileGeneratorTest extends BaseSparkTest {
             e.getDirection())
         }.toDF("timestamp", "operation", "element", "from", "to", "label", 
"props", "direction").rdd
 
-        val transformer = new SparkBulkLoaderTransformer(s2Config, options)
+        val transformer = new SparkBulkLoaderTransformer(s2Config, 
options.labelMapping, options.buildDegree)
 
         implicit val reader = new RowBulkFormatReader
         implicit val writer = new KeyValueWriter

Reply via email to