add writeBatchWithMutate on S2GraphSink
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/7ad721cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/7ad721cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/7ad721cf Branch: refs/heads/master Commit: 7ad721cfa5f87916cf445eabb08f14d8490a95c7 Parents: 86dcc11 Author: Chul Kang <[email protected]> Authored: Tue Apr 3 14:52:50 2018 +0900 Committer: Chul Kang <[email protected]> Committed: Tue Apr 3 14:52:50 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/s2jobs/task/Sink.scala | 63 ++++++++++++++++---- 1 file changed, 52 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7ad721cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index 7c4c857..20c1558 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -19,7 +19,7 @@ package org.apache.s2graph.s2jobs.task -import com.typesafe.config.Config +import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles import org.apache.hadoop.util.ToolRunner @@ -28,10 +28,15 @@ import org.apache.s2graph.s2jobs.S2GraphHelper import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer} import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter +import org.apache.s2graph.spark.sql.streaming.S2SinkContext import org.apache.spark.sql._ import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger} import org.elasticsearch.spark.sql.EsSparkSQL +import scala.collection.mutable.ListBuffer +import scala.concurrent.Await +import scala.concurrent.duration.Duration + /** * Sink * @@ -214,21 +219,57 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con if (inputDF.isStreaming) writeStream(df.writeStream) else { - val options = S2GraphHelper.toGraphFileOptions(conf) - val config = Management.toConfig(options.toConfigParams) - val input = df.rdd + conf.options.getOrElse("writeMethod", "mutate") match { + case "bulk" => writeBatchWithBulkload(df) + case "mutate" => writeBatchWithMutate(df) + } + + } + } + + private def writeBatchWithBulkload(df:DataFrame):Unit = { + val options = S2GraphHelper.toGraphFileOptions(conf) + val config = Management.toConfig(options.toConfigParams) + val input = df.rdd + + val transformer = new SparkBulkLoaderTransformer(config, options) + + implicit val reader = new RowBulkFormatReader + implicit val writer = new KeyValueWriter + + val kvs = transformer.transform(input) + + HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options) + + // finish bulk load by execute LoadIncrementHFile. + HFileGenerator.loadIncrementHFile(options) + } + + private def writeBatchWithMutate(df:DataFrame):Unit = { + import scala.collection.JavaConversions._ + import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._ + + val graphConfig:Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load()) + val serializedConfig = graphConfig.root().render(ConfigRenderOptions.concise()) + + val reader = new RowBulkFormatReader - val transformer = new SparkBulkLoaderTransformer(config, options) + val groupedSize = getConfigString(graphConfig, S2_SINK_GROUPED_SIZE, DEFAULT_GROUPED_SIZE).toInt + val waitTime = getConfigString(graphConfig, S2_SINK_WAIT_TIME, DEFAULT_WAIT_TIME_SECONDS).toInt - implicit val reader = new RowBulkFormatReader - implicit val writer = new KeyValueWriter + df.foreachPartition{ iters => + val config = ConfigFactory.parseString(serializedConfig) + val s2Graph = S2GraphHelper.initS2Graph(config) - val kvs = transformer.transform(input) + val responses = iters.grouped(groupedSize).flatMap { rows => + val elements = rows.flatMap(row => reader.read(s2Graph)(row)) - HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), options) + val mutateF = s2Graph.mutateElements(elements, true) + Await.result(mutateF, Duration(waitTime, "seconds")) + } - // finish bulk load by execute LoadIncrementHFile. - HFileGenerator.loadIncrementHFile(options) + val (success, fail) = responses.toSeq.partition(r => r.isSuccess) + logger.info(s"success : ${success.size}, fail : ${fail.size}") } } }
