add runLoadIncrementalHFiles option for S2graphSink.write.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/f4bd2842
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/f4bd2842
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/f4bd2842

Branch: refs/heads/master
Commit: f4bd2842f7c0bc0b3eb1a7f5467c25e38e4da999
Parents: f2857d1
Author: DO YUNG YOON <[email protected]>
Authored: Wed Apr 4 11:31:26 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Wed Apr 4 11:31:26 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/s2jobs/loader/HFileGenerator.scala   | 2 +-
 .../main/scala/org/apache/s2graph/s2jobs/task/Sink.scala    | 9 +++++----
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f4bd2842/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index da190ee..e7535d4 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -125,7 +125,7 @@ object HFileGenerator extends RawFileGenerator[String, 
KeyValue] {
     HFileGenerator.generateHFile(sc, config, kvs, _options)
   }
 
-  def loadIncrementHFile(options: GraphFileOptions): Int = {
+  def loadIncrementalHFiles(options: GraphFileOptions): Int = {
     /* LoadIncrementHFiles */
     val hfileArgs = Array(options.output, options.tableName)
     val hbaseConfig = HBaseConfiguration.create()

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f4bd2842/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 77e8cb1..866aa47 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -214,7 +214,7 @@ class S2graphSink(queryName: String, conf: TaskConf) 
extends Sink(queryName, con
 
   override val FORMAT: String = 
"org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
 
-  private def writeBatchBulkload(df: DataFrame): Unit = {
+  private def writeBatchBulkload(df: DataFrame, runLoadIncrementalHFiles: 
Boolean = true): Unit = {
     val options = TaskConf.toGraphFileOptions(conf)
     val config = Management.toConfig(options.toConfigParams)
     val input = df.rdd
@@ -229,7 +229,7 @@ class S2graphSink(queryName: String, conf: TaskConf) 
extends Sink(queryName, con
     HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, 
kvs.flatMap(ls => ls), options)
 
     // finish bulk load by execute LoadIncrementHFile.
-    HFileGenerator.loadIncrementHFile(options)
+    if (runLoadIncrementalHFiles) HFileGenerator.loadIncrementalHFiles(options)
   }
 
   private def writeBatchWithMutate(df:DataFrame):Unit = {
@@ -267,9 +267,10 @@ class S2graphSink(queryName: String, conf: TaskConf) 
extends Sink(queryName, con
     else {
       conf.options.getOrElse("writeMethod", "mutate") match {
         case "mutate" => writeBatchWithMutate(df)
-        case "bulk" => writeBatchBulkload(df)
+        case "bulk" =>
+          val runLoadIncrementalHFiles = 
conf.options.getOrElse("runLoadIncrementalHFiles", "true").toBoolean
+          writeBatchBulkload(df, runLoadIncrementalHFiles)
         case writeMethod:String => throw new 
IllegalArgumentException(s"unsupported write method '$writeMethod' (valid 
method: mutate, bulk)")
-
       }
     }
   }

Reply via email to