separate bulkload from write.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/1ac70d54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/1ac70d54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/1ac70d54

Branch: refs/heads/master
Commit: 1ac70d540410242f2b8cff5e2664a519d1e4affd
Parents: 86dcc11
Author: DO YUNG YOON <[email protected]>
Authored: Tue Apr 3 10:06:04 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Tue Apr 3 10:06:04 2018 +0900

----------------------------------------------------------------------
 .../s2graph/s2jobs/serde/Transformer.scala      |  2 +-
 .../org/apache/s2graph/s2jobs/task/Sink.scala   | 32 +++++++++++---------
 2 files changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1ac70d54/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
index ef1bd29..a448d3f 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
@@ -34,7 +34,7 @@ import scala.reflect.ClassTag
   */
 trait Transformer[M[_]] extends Serializable {
   val config: Config
-  val options: GraphFileOptions
+//  val options: GraphFileOptions
 
   def buildDegrees[T: ClassTag](elements: M[GraphElement])(implicit writer: 
GraphElementWritable[T]): M[T]
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1ac70d54/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 7c4c857..8c57539 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -209,26 +209,30 @@ class S2graphSink(queryName: String, conf: TaskConf) 
extends Sink(queryName, con
 
   override val FORMAT: String = 
"org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
 
-  override def write(inputDF: DataFrame): Unit = {
-    val df = repartition(preprocess(inputDF), 
inputDF.sparkSession.sparkContext.defaultParallelism)
+  private def bulkload(df: DataFrame): Unit = {
+    val options = S2GraphHelper.toGraphFileOptions(conf)
+    val config = Management.toConfig(options.toConfigParams)
+    val input = df.rdd
 
-    if (inputDF.isStreaming) writeStream(df.writeStream)
-    else {
-      val options = S2GraphHelper.toGraphFileOptions(conf)
-      val config = Management.toConfig(options.toConfigParams)
-      val input = df.rdd
+    val transformer = new SparkBulkLoaderTransformer(config, options)
+
+    implicit val reader = new RowBulkFormatReader
+    implicit val writer = new KeyValueWriter
 
-      val transformer = new SparkBulkLoaderTransformer(config, options)
+    val kvs = transformer.transform(input)
 
-      implicit val reader = new RowBulkFormatReader
-      implicit val writer = new KeyValueWriter
+    HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, 
kvs.flatMap(ls => ls), options)
 
-      val kvs = transformer.transform(input)
+    // finish bulk load by execute LoadIncrementHFile.
+    HFileGenerator.loadIncrementHFile(options)
+  }
 
-      HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, 
kvs.flatMap(ls => ls), options)
+  override def write(inputDF: DataFrame): Unit = {
+    val df = repartition(preprocess(inputDF), 
inputDF.sparkSession.sparkContext.defaultParallelism)
 
-      // finish bulk load by execute LoadIncrementHFile.
-      HFileGenerator.loadIncrementHFile(options)
+    if (inputDF.isStreaming) writeStream(df.writeStream)
+    else {
+      bulkload(df)
     }
   }
 }

Reply via email to