Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 1294c1822 -> f77a152c5


[S2GRAPH-251] add jdbc options #251


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/2802837d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/2802837d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/2802837d

Branch: refs/heads/master
Commit: 2802837de584515b7776ae786e33ef2d0fe40e3c
Parents: 1294c18
Author: Chul Kang <[email protected]>
Authored: Fri Dec 21 19:24:14 2018 +0900
Committer: Chul Kang <[email protected]>
Committed: Fri Dec 21 19:24:14 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/s2jobs/JobDescription.scala    |  2 ++
 .../scala/org/apache/s2graph/s2jobs/task/Sink.scala   | 14 ++++++++++++++
 .../scala/org/apache/s2graph/s2jobs/task/Source.scala |  8 ++++++++
 3 files changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2802837d/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
index 0943056..cfa547b 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
@@ -54,6 +54,7 @@ object JobDescription extends Logger {
       case "kafka" => new KafkaSource(conf)
       case "file"  => new FileSource(conf)
       case "hive" => new HiveSource(conf)
+      case "jdbc" => new JdbcSource(conf)
       case "s2graph" => new S2GraphSource(conf)
       case _ => throw new IllegalArgumentException(s"unsupported source type : 
${conf.`type`}")
     }
@@ -86,6 +87,7 @@ object JobDescription extends Logger {
       case "file" => new FileSink(jobName, conf)
       case "es" => new ESSink(jobName, conf)
       case "s2graph" => new S2GraphSink(jobName, conf)
+      case "jdbc" => new JdbcSink(jobName, conf)
       case "custom" =>
         val customClassOpt = conf.options.get("class")
         customClassOpt match {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2802837d/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index dd6f41b..c5ec8f0 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -180,6 +180,20 @@ class HiveSink(queryName: String, conf: TaskConf) extends 
Sink(queryName, conf)
 }
 
 /**
+  * JdbcSink
+  * @param queryName
+  * @param conf
+  */
+class JdbcSink(queryName: String, conf: TaskConf) extends Sink(queryName, 
conf) {
+  override def mandatoryOptions: Set[String] = Set("url", "dbtable")
+  override val FORMAT: String = "jdbc"
+
+  override protected def writeBatchInner(writer: DataFrameWriter[Row]): Unit = 
{
+    writer.format("jdbc").options(conf.options).save()
+  }
+}
+
+/**
   * ESSink
   *
   * @param queryName

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2802837d/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
index 8e4e234..d985edc 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
@@ -131,6 +131,14 @@ class HiveSource(conf:TaskConf) extends Source(conf) {
   }
 }
 
+class JdbcSource(conf:TaskConf) extends Source(conf) {
+  override def mandatoryOptions: Set[String] = Set("url", "dbtable")
+  override def toDF(ss: SparkSession): DataFrame = {
+    ss.read.format("jdbc").options(conf.options).load()
+  }
+
+}
+
 class S2GraphSource(conf: TaskConf) extends Source(conf) {
   import org.apache.s2graph.spark.sql.streaming.S2SourceConfigs._
   override def mandatoryOptions: Set[String] = Set(

Reply via email to