separate bulkload from write.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/1ac70d54 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/1ac70d54 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/1ac70d54 Branch: refs/heads/master Commit: 1ac70d540410242f2b8cff5e2664a519d1e4affd Parents: 86dcc11 Author: DO YUNG YOON <[email protected]> Authored: Tue Apr 3 10:06:04 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Tue Apr 3 10:06:04 2018 +0900 ---------------------------------------------------------------------- .../s2graph/s2jobs/serde/Transformer.scala | 2 +- .../org/apache/s2graph/s2jobs/task/Sink.scala | 32 +++++++++++--------- 2 files changed, 19 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1ac70d54/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala index ef1bd29..a448d3f 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala @@ -34,7 +34,7 @@ import scala.reflect.ClassTag */ trait Transformer[M[_]] extends Serializable { val config: Config - val options: GraphFileOptions +// val options: GraphFileOptions def buildDegrees[T: ClassTag](elements: M[GraphElement])(implicit writer: GraphElementWritable[T]): M[T] http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1ac70d54/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index 7c4c857..8c57539 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -209,26 +209,30 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider" - override def write(inputDF: DataFrame): Unit = { - val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism) + private def bulkload(df: DataFrame): Unit = { + val options = S2GraphHelper.toGraphFileOptions(conf) + val config = Management.toConfig(options.toConfigParams) + val input = df.rdd - if (inputDF.isStreaming) writeStream(df.writeStream) - else { - val options = S2GraphHelper.toGraphFileOptions(conf) - val config = Management.toConfig(options.toConfigParams) - val input = df.rdd + val transformer = new SparkBulkLoaderTransformer(config, options) + + implicit val reader = new RowBulkFormatReader + implicit val writer = new KeyValueWriter - val transformer = new SparkBulkLoaderTransformer(config, options) + val kvs = transformer.transform(input) - implicit val reader = new RowBulkFormatReader - implicit val writer = new KeyValueWriter + HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options) - val kvs = transformer.transform(input) + // finish bulk load by execute LoadIncrementHFile. + HFileGenerator.loadIncrementHFile(options) + } - HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options) + override def write(inputDF: DataFrame): Unit = { + val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism) - // finish bulk load by execute LoadIncrementHFile. - HFileGenerator.loadIncrementHFile(options) + if (inputDF.isStreaming) writeStream(df.writeStream) + else { + bulkload(df) } } }
