merge writeBatch for S2GraphSink
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/fc9fde7f Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/fc9fde7f Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/fc9fde7f Branch: refs/heads/master Commit: fc9fde7fbde1bcc18f2dec41a54ec70af01cd80c Parents: c47359c c129ed0 Author: Chul Kang <[email protected]> Authored: Tue Apr 3 18:47:13 2018 +0900 Committer: Chul Kang <[email protected]> Committed: Tue Apr 3 18:47:13 2018 +0900 ---------------------------------------------------------------------- .../apache/s2graph/s2jobs/S2GraphHelper.scala | 9 ----- .../s2jobs/loader/GraphFileOptions.scala | 6 ++++ .../s2graph/s2jobs/serde/Transformer.scala | 2 +- .../org/apache/s2graph/s2jobs/task/Sink.scala | 35 ++++++++++---------- .../org/apache/s2graph/s2jobs/task/Task.scala | 9 +++++ .../apache/s2graph/s2jobs/BaseSparkTest.scala | 2 -- .../apache/s2graph/s2jobs/task/SinkTest.scala | 31 ++++++++++++----- 7 files changed, 57 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fc9fde7f/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --cc s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index 20c1558,0dce19a..77e8cb1 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@@ -214,21 -209,8 +214,8 @@@ class S2graphSink(queryName: String, co override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider" - override def write(inputDF: DataFrame): Unit = { - val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism) - - if (inputDF.isStreaming) writeStream(df.writeStream) - else { - conf.options.getOrElse("writeMethod", "mutate") match { - case "bulk" => writeBatchWithBulkload(df) - case "mutate" => writeBatchWithMutate(df) - } - - } - } - - private def writeBatchWithBulkload(df:DataFrame):Unit = { - val options = S2GraphHelper.toGraphFileOptions(conf) - private def bulkload(df: DataFrame): Unit = { ++ private def writeBatchBulkload(df: DataFrame): Unit = { + val options = TaskConf.toGraphFileOptions(conf) val config = Management.toConfig(options.toConfigParams) val input = df.rdd @@@ -245,32 -227,13 +232,46 @@@ HFileGenerator.loadIncrementHFile(options) } + private def writeBatchWithMutate(df:DataFrame):Unit = { + import scala.collection.JavaConversions._ + import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._ + - val graphConfig:Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load()) ++ val graphConfig: Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load()) + val serializedConfig = graphConfig.root().render(ConfigRenderOptions.concise()) + + val reader = new RowBulkFormatReader + + val groupedSize = getConfigString(graphConfig, S2_SINK_GROUPED_SIZE, DEFAULT_GROUPED_SIZE).toInt + val waitTime = getConfigString(graphConfig, S2_SINK_WAIT_TIME, DEFAULT_WAIT_TIME_SECONDS).toInt + - df.foreachPartition{ iters => ++ df.foreachPartition { iters => + val config = ConfigFactory.parseString(serializedConfig) + val s2Graph = S2GraphHelper.initS2Graph(config) + + val responses = iters.grouped(groupedSize).flatMap { rows => + val elements = rows.flatMap(row => reader.read(s2Graph)(row)) + + val mutateF = s2Graph.mutateElements(elements, true) + Await.result(mutateF, Duration(waitTime, "seconds")) + } + + val (success, fail) = responses.toSeq.partition(r => r.isSuccess) + logger.info(s"success : ${success.size}, fail : ${fail.size}") + } + } ++ + override def write(inputDF: DataFrame): Unit = { + val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism) + + if (inputDF.isStreaming) writeStream(df.writeStream) + else { - bulkload(df) ++ conf.options.getOrElse("writeMethod", "mutate") match { ++ case "mutate" => writeBatchWithMutate(df) ++ case "bulk" => writeBatchBulkload(df) ++ case writeMethod:String => throw new IllegalArgumentException(s"unsupported write method '$writeMethod' (valid method: mutate, bulk)") ++ ++ } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fc9fde7f/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala ---------------------------------------------------------------------- diff --cc s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala index 4f02808,4f02808..0461d1e --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala @@@ -33,8 -33,8 +33,6 @@@ import org.scalatest.{BeforeAndAfterAll import scala.util.Try class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase { -- private val master = "local[2]" -- private val appName = "example-spark" protected val options = GraphFileOptions( input = "/tmp/test.txt", http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fc9fde7f/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala ---------------------------------------------------------------------- diff --cc s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala index db96328,a21b3df..02b724e --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala @@@ -26,6 -26,6 +26,10 @@@ import org.apache.spark.sql.DataFram import scala.collection.JavaConverters._ class SinkTest extends BaseSparkTest { ++ override def beforeAll(): Unit = { ++ super.beforeAll() ++ initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) ++ } def toDataFrame(edges: Seq[String]): DataFrame = { import spark.sqlContext.implicits._ val elements = edges.flatMap(s2.elementBuilder.toEdge(_)) @@@ -42,13 -42,12 +46,11 @@@ }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction") } - test("S2graphSink writeBatch.") { - val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) - + test("S2graphSink writeBatchWithBulkload") { - initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) - val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" val df = toDataFrame(Seq(bulkEdgeString)) - val args = options.toCommand.grouped(2).map { kv => + val args = Map("writeMethod" -> "bulk") ++ + options.toCommand.grouped(2).map { kv => kv.head -> kv.last }.toMap @@@ -59,25 -58,5 +61,38 @@@ val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike]) s2Edges.foreach { edge => println(edge) } ++ ++ val filteredEdges = s2Edges.filter{ edge => ++ edge.srcVertex.innerIdVal.toString == "a" && ++ edge.tgtVertex.innerIdVal.toString == "b" && ++ edge.label() == "friends" ++ } ++ ++ assert(filteredEdges.size == 1) + } + + test("S2graphSink writeBatchWithMutate") { - initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) - - val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":20}" ++ val bulkEdgeString = "1416236400000\tinsert\tedge\tb\tc\tfriends\t{\"since\":1316236400000,\"score\":20}" + val df = toDataFrame(Seq(bulkEdgeString)) + val args = Map("writeMethod" -> "mutate") ++ - options.toCommand.grouped(2).map { kv => - kv.head -> kv.last - }.toMap ++ options.toCommand.grouped(2).map { kv => kv.head -> kv.last }.toMap + + val conf = TaskConf("test", "sql", Seq("input"), args) + + val sink = new S2graphSink("testQuery", conf) + sink.write(df) + + val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike]) + s2Edges.foreach { edge => println(edge) } ++ ++ val filteredEdges = s2Edges.filter{ edge => ++ edge.srcVertex.innerIdVal.toString == "b" && ++ edge.tgtVertex.innerIdVal.toString == "c" && ++ edge.getTs() == 1416236400000L && ++ edge.label() == "friends" ++ } ++ ++ assert(filteredEdges.size == 1) } + }
