Repository: incubator-s2graph Updated Branches: refs/heads/master 1294c1822 -> f77a152c5
[S2GRAPH-251] add jdbc options #251 Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/2802837d Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/2802837d Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/2802837d Branch: refs/heads/master Commit: 2802837de584515b7776ae786e33ef2d0fe40e3c Parents: 1294c18 Author: Chul Kang <[email protected]> Authored: Fri Dec 21 19:24:14 2018 +0900 Committer: Chul Kang <[email protected]> Committed: Fri Dec 21 19:24:14 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/s2jobs/JobDescription.scala | 2 ++ .../scala/org/apache/s2graph/s2jobs/task/Sink.scala | 14 ++++++++++++++ .../scala/org/apache/s2graph/s2jobs/task/Source.scala | 8 ++++++++ 3 files changed, 24 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2802837d/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala index 0943056..cfa547b 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala @@ -54,6 +54,7 @@ object JobDescription extends Logger { case "kafka" => new KafkaSource(conf) case "file" => new FileSource(conf) case "hive" => new HiveSource(conf) + case "jdbc" => new JdbcSource(conf) case "s2graph" => new S2GraphSource(conf) case _ => throw new IllegalArgumentException(s"unsupported source type : ${conf.`type`}") } @@ -86,6 +87,7 @@ object JobDescription extends Logger { case "file" => new FileSink(jobName, conf) case "es" => new ESSink(jobName, conf) case "s2graph" => new S2GraphSink(jobName, conf) + case "jdbc" => new JdbcSink(jobName, conf) case "custom" => val customClassOpt = conf.options.get("class") customClassOpt match { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2802837d/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index dd6f41b..c5ec8f0 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -180,6 +180,20 @@ class HiveSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) } /** + * JdbcSink + * @param queryName + * @param conf + */ +class JdbcSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) { + override def mandatoryOptions: Set[String] = Set("url", "dbtable") + override val FORMAT: String = "jdbc" + + override protected def writeBatchInner(writer: DataFrameWriter[Row]): Unit = { + writer.format("jdbc").options(conf.options).save() + } +} + +/** * ESSink * * @param queryName http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2802837d/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala index 8e4e234..d985edc 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala @@ -131,6 +131,14 @@ class HiveSource(conf:TaskConf) extends Source(conf) { } } +class JdbcSource(conf:TaskConf) extends Source(conf) { + override def mandatoryOptions: Set[String] = Set("url", "dbtable") + override def toDF(ss: SparkSession): DataFrame = { + ss.read.format("jdbc").options(conf.options).load() + } + +} + class S2GraphSource(conf: TaskConf) extends Source(conf) { import org.apache.s2graph.spark.sql.streaming.S2SourceConfigs._ override def mandatoryOptions: Set[String] = Set(
