add runLoadIncrementalHFiles option for S2graphSink.write.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/f4bd2842 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/f4bd2842 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/f4bd2842 Branch: refs/heads/master Commit: f4bd2842f7c0bc0b3eb1a7f5467c25e38e4da999 Parents: f2857d1 Author: DO YUNG YOON <[email protected]> Authored: Wed Apr 4 11:31:26 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Wed Apr 4 11:31:26 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/s2jobs/loader/HFileGenerator.scala | 2 +- .../main/scala/org/apache/s2graph/s2jobs/task/Sink.scala | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f4bd2842/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala index da190ee..e7535d4 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala @@ -125,7 +125,7 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] { HFileGenerator.generateHFile(sc, config, kvs, _options) } - def loadIncrementHFile(options: GraphFileOptions): Int = { + def loadIncrementalHFiles(options: GraphFileOptions): Int = { /* LoadIncrementHFiles */ val hfileArgs = Array(options.output, options.tableName) val hbaseConfig = HBaseConfiguration.create() http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f4bd2842/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index 77e8cb1..866aa47 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -214,7 +214,7 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider" - private def writeBatchBulkload(df: DataFrame): Unit = { + private def writeBatchBulkload(df: DataFrame, runLoadIncrementalHFiles: Boolean = true): Unit = { val options = TaskConf.toGraphFileOptions(conf) val config = Management.toConfig(options.toConfigParams) val input = df.rdd @@ -229,7 +229,7 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options) // finish bulk load by execute LoadIncrementHFile. - HFileGenerator.loadIncrementHFile(options) + if (runLoadIncrementalHFiles) HFileGenerator.loadIncrementalHFiles(options) } private def writeBatchWithMutate(df:DataFrame):Unit = { @@ -267,9 +267,10 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con else { conf.options.getOrElse("writeMethod", "mutate") match { case "mutate" => writeBatchWithMutate(df) - case "bulk" => writeBatchBulkload(df) + case "bulk" => + val runLoadIncrementalHFiles = conf.options.getOrElse("runLoadIncrementalHFiles", "true").toBoolean + writeBatchBulkload(df, runLoadIncrementalHFiles) case writeMethod:String => throw new IllegalArgumentException(s"unsupported write method '$writeMethod' (valid method: mutate, bulk)") - } } }
