modify tc
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/c47359c1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/c47359c1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/c47359c1 Branch: refs/heads/master Commit: c47359c12796521106884b09d464099a64924f5d Parents: 7ad721c Author: Chul Kang <[email protected]> Authored: Tue Apr 3 14:54:27 2018 +0900 Committer: Chul Kang <[email protected]> Committed: Tue Apr 3 14:54:27 2018 +0900 ---------------------------------------------------------------------- .../s2graph/s2jobs/S2GraphHelperTest.scala | 13 +++++++++- .../apache/s2graph/s2jobs/task/SinkTest.scala | 27 +++++++++++++++++--- 2 files changed, 36 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c47359c1/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala index 6e89466..f2b0102 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala @@ -19,6 +19,17 @@ package org.apache.s2graph.s2jobs -class S2GraphHelperTest { +import org.apache.s2graph.s2jobs.task.TaskConf +class S2GraphHelperTest extends BaseSparkTest { + test("toGraphFileOptions") { + val args = options.toCommand.grouped(2).map { kv => + kv.head -> kv.last + }.toMap ++ Map("db.default.url" -> "jdbc://localhost:3306/mysql") + + println(args) + val taskConf = TaskConf("dummy", "sink", Nil, args) + val graphFileOptions = S2GraphHelper.toGraphFileOptions(taskConf) + println(graphFileOptions) + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c47359c1/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala index a21b3df..db96328 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala @@ -42,12 +42,13 @@ class SinkTest extends BaseSparkTest { }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction") } - test("S2graphSink writeBatch.") { - val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) + test("S2graphSink writeBatchWithBulkload") { + initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" val df = toDataFrame(Seq(bulkEdgeString)) - val args = options.toCommand.grouped(2).map { kv => + val args = Map("writeMethod" -> "bulk") ++ + options.toCommand.grouped(2).map { kv => kv.head -> kv.last }.toMap @@ -59,4 +60,24 @@ class SinkTest extends BaseSparkTest { val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike]) s2Edges.foreach { edge => println(edge) } } + + test("S2graphSink writeBatchWithMutate") { + initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) + + val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":20}" + val df = toDataFrame(Seq(bulkEdgeString)) + val args = Map("writeMethod" -> "mutate") ++ + options.toCommand.grouped(2).map { kv => + kv.head -> kv.last + }.toMap + + val conf = TaskConf("test", "sql", Seq("input"), args) + + val sink = new S2graphSink("testQuery", conf) + sink.write(df) + + val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike]) + s2Edges.foreach { edge => println(edge) } + } + }
