Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 653deee07 -> 245335ad3


add simple etl job interface


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e0ba0aa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e0ba0aa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e0ba0aa7

Branch: refs/heads/master
Commit: e0ba0aa74f05a0f57bfa17ac87f898fd3456a790
Parents: 2ce86fa
Author: Chul Kang <[email protected]>
Authored: Thu Mar 22 16:58:18 2018 +0900
Committer: Chul Kang <[email protected]>
Committed: Thu Mar 29 12:02:32 2018 +0900

----------------------------------------------------------------------
 project/Common.scala                            |   2 +-
 s2jobs/build.sbt                                |   4 +-
 .../scala/org/apache/s2graph/s2jobs/Job.scala   |  49 ++++++
 .../apache/s2graph/s2jobs/JobDescription.scala  |  69 ++++++++
 .../org/apache/s2graph/s2jobs/JobLauncher.scala |  69 ++++++++
 .../org/apache/s2graph/s2jobs/Logger.scala      |   7 +
 .../org/apache/s2graph/s2jobs/Schema.scala      |  16 ++
 .../apache/s2graph/s2jobs/task/Process.scala    |  34 ++++
 .../org/apache/s2graph/s2jobs/task/Sink.scala   | 165 +++++++++++++++++++
 .../org/apache/s2graph/s2jobs/task/Source.scala |  85 ++++++++++
 .../org/apache/s2graph/s2jobs/task/Task.scala   |  19 +++
 .../s2graph/s2jobs/task/ProcessTest.scala       |  29 ++++
 12 files changed, 546 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/project/Common.scala
----------------------------------------------------------------------
diff --git a/project/Common.scala b/project/Common.scala
index e2323a2..f187e07 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -29,7 +29,7 @@ object Common {
   val hadoopVersion = "2.7.3"
   val tinkerpopVersion = "3.2.5"
 
-  val elastic4sVersion = "6.1.0"
+  val elastic4sVersion = "6.1.1"
 
   /** use Log4j 1.2.17 as the SLF4j backend in runtime, with bridging 
libraries to forward JCL and JUL logs to SLF4j */
   val loggingRuntime = Seq(

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/build.sbt
----------------------------------------------------------------------
diff --git a/s2jobs/build.sbt b/s2jobs/build.sbt
index b915238..02ed48d 100644
--- a/s2jobs/build.sbt
+++ b/s2jobs/build.sbt
@@ -35,7 +35,9 @@ libraryDependencies ++= Seq(
   "org.specs2" %% "specs2-core" % specs2Version % "test",
   "org.scalatest" %% "scalatest" % "2.2.1" % "test",
   "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion,
-  "com.github.scopt" %% "scopt" % "3.7.0"
+  "org.elasticsearch" % "elasticsearch-spark-20_2.11" % elastic4sVersion,
+  "com.github.scopt" %% "scopt" % "3.7.0",
+  "com.holdenkarau" %% "spark-testing-base" % "2.3.0_0.9.0" % Test
 )
 
 crossScalaVersions := Seq("2.10.6")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
new file mode 100644
index 0000000..b8fccc2
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
@@ -0,0 +1,49 @@
+package org.apache.s2graph.s2jobs
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.s2graph.s2jobs.task._
+
+import scala.collection.mutable
+
+class Job(ss:SparkSession, jobDesc:JobDescription) extends Serializable with 
Logger {
+  private val dfMap = mutable.Map[String, DataFrame]()
+
+  def run() = {
+    // source
+    jobDesc.sources.foreach{ source => dfMap.put(source.conf.name, 
source.toDF(ss))}
+    logger.debug(s"valid source DF set : ${dfMap.keySet}")
+
+    // process
+    var processRst:Seq[(String, DataFrame)] = Nil
+    do {
+      processRst = getValidProcess(jobDesc.processes)
+      processRst.foreach { case (name, df) => dfMap.put(name, df)}
+
+    } while(processRst.nonEmpty)
+
+    logger.debug(s"valid named DF set : ${dfMap.keySet}")
+    // sinks
+    jobDesc.sinks.foreach { s =>
+      val inputDFs = s.conf.inputs.flatMap{ input => dfMap.get(input)}
+      if (inputDFs.isEmpty) throw new IllegalArgumentException(s"sink has not 
valid inputs (${s.conf.name})")
+
+      // use only first input
+      s.write(inputDFs.head)
+    }
+
+  }
+
+  private def getValidProcess(processes:Seq[Process]):Seq[(String, DataFrame)] 
= {
+    val dfKeys = dfMap.keySet
+
+    processes.filter{ p =>
+        var existAllInput = true
+        p.conf.inputs.foreach { input => existAllInput = dfKeys(input) }
+        !dfKeys(p.conf.name) && existAllInput
+    }
+    .map { p =>
+      val inputMap = p.conf.inputs.map{ input => (input,  dfMap(input)) }.toMap
+      p.conf.name -> p.execute(ss, inputMap)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
new file mode 100644
index 0000000..0a9194f
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
@@ -0,0 +1,69 @@
+package org.apache.s2graph.s2jobs
+
+import play.api.libs.json.{JsValue, Json}
+import org.apache.s2graph.s2jobs.task._
+
+case class JobDescription(
+                           name:String,
+                           sources:Seq[Source],
+                           processes:Seq[task.Process],
+                           sinks:Seq[Sink]
+                         )
+
+object JobDescription extends Logger {
+  val dummy = JobDescription("dummy", Nil, Nil, Nil)
+
+  def apply(jsVal:JsValue):JobDescription = {
+    implicit val TaskConfReader = Json.reads[TaskConf]
+
+    logger.debug(s"JobDescription: ${jsVal}")
+
+    val jobName = (jsVal \ "name").as[String]
+    val sources = (jsVal \ 
"source").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getSource(conf))
+    val processes = (jsVal \ 
"process").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getProcess(conf))
+    val sinks = (jsVal \ "sink").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf 
=> getSink(jobName, conf))
+
+    JobDescription(jobName, sources, processes, sinks)
+  }
+
+  def getSource(conf:TaskConf):Source = {
+    conf.`type` match {
+      case "kafka" => new KafkaSource(conf)
+      case "file"  => new FileSource(conf)
+      case "hive" => new HiveSource(conf)
+      case _ => throw new IllegalArgumentException(s"unsupported source type : 
${conf.`type`}")
+    }
+  }
+
+  def getProcess(conf:TaskConf): task.Process = {
+    conf.`type` match {
+      case "sql" => new SqlProcess(conf)
+      case "custom" =>
+        val customClassOpt = conf.options.get("class")
+        customClassOpt match {
+          case Some(customClass:String) =>
+            logger.debug(s"custom class init.. $customClass")
+
+            Class.forName(customClass)
+              .getConstructor(TaskConf.getClass)
+              .newInstance(conf)
+              .asInstanceOf[task.Process]
+
+          case None => throw new IllegalArgumentException(s"custom class name 
is not exist.. ${conf}")
+        }
+
+      case _ => throw new IllegalArgumentException(s"unsupported process type 
: ${conf.`type`}")
+    }
+  }
+
+  def getSink(jobName:String, conf:TaskConf):Sink = {
+    conf.`type` match {
+      case "kafka" => new KafkaSink(jobName, conf)
+      case "file" => new FileSink(jobName, conf)
+      case "es" => new ESSink(jobName, conf)
+      case "s2graph" => new S2graphSink(jobName, conf)
+      case _ => throw new IllegalArgumentException(s"unsupported sink type : 
${conf.`type`}")
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
new file mode 100644
index 0000000..d0b355c
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
@@ -0,0 +1,69 @@
+package org.apache.s2graph.s2jobs
+
+import org.apache.spark.sql.SparkSession
+import play.api.libs.json.{JsValue, Json}
+
+import scala.io.Source
+
+case class JobOption(
+                      name:String = "S2BatchJob",
+                      confType:String = "db",
+                      jobId:Int = -1,
+                      confFile:String = ""
+                    )
+
+object JobLauncher extends Logger {
+
+  def parseArguments(args: Array[String]): JobOption = {
+    val parser = new scopt.OptionParser[JobOption]("run") {
+      opt[String]('n', "name").required().action((x, c) =>
+        c.copy(name = x)).text("job display name")
+
+      cmd("file").action((_, c) => c.copy(confType = "file"))
+        .text("get config from file")
+        .children(
+          opt[String]('f', 
"confFile").required().valueName("<file>").action((x, c) =>
+            c.copy(confFile = x)).text("configuration file")
+        )
+
+      cmd("db").action((_, c) => c.copy(confType = "db"))
+        .text("get config from db")
+        .children(
+          opt[String]('i', "jobId").required().valueName("<jobId>").action((x, 
c) =>
+            c.copy(jobId = x.toInt)).text("configuration file")
+        )
+    }
+
+    parser.parse(args, JobOption()) match {
+      case Some(o) => o
+      case None =>
+        parser.showUsage()
+        throw new IllegalArgumentException(s"failed to parse options... 
(${args.mkString(",")}")
+    }
+  }
+
+  def getConfig(options: JobOption):JsValue = options.confType match {
+    case "file" =>
+      Json.parse(Source.fromFile(options.confFile).mkString)
+    case "db" =>
+      throw new IllegalArgumentException(s"'db' option that read config file 
from database is not supported yet.. ")
+  }
+
+  def main(args: Array[String]): Unit = {
+
+    val options = parseArguments(args)
+    logger.info(s"Job Options : ${options}")
+
+    val jobDescription = JobDescription(getConfig(options))
+
+    val ss = SparkSession
+      .builder()
+      .appName(s"${jobDescription.name}")
+      .config("spark.driver.maxResultSize", "20g")
+      .enableHiveSupport()
+      .getOrCreate()
+
+    val job = new Job(ss, jobDescription)
+    job.run()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Logger.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Logger.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Logger.scala
new file mode 100644
index 0000000..1df55f0
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Logger.scala
@@ -0,0 +1,7 @@
+package org.apache.s2graph.s2jobs
+
+import org.apache.commons.logging.{Log, LogFactory}
+
+trait Logger {
+  @transient lazy val logger: Log = LogFactory.getLog(this.getClass)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala
new file mode 100644
index 0000000..f214218
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala
@@ -0,0 +1,16 @@
+package org.apache.s2graph.s2jobs
+
+import org.apache.spark.sql.types.{LongType, StringType, StructField, 
StructType}
+
+object Schema {
+  val BulkLoadSchema = StructType(Seq(
+    StructField("timestamp", LongType, false),
+    StructField("operation", StringType, false),
+    StructField("elem", StringType, false),
+    StructField("from", StringType, false),
+    StructField("to", StringType, false),
+    StructField("label", StringType, false),
+    StructField("props", StringType, false),
+    StructField("direction", StringType, true)
+  ))
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala
new file mode 100644
index 0000000..73d2fab
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala
@@ -0,0 +1,34 @@
+package org.apache.s2graph.s2jobs.task
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+/**
+  * Process
+  * @param conf
+  */
+abstract class Process(override val conf:TaskConf) extends Task {
+  def execute(ss:SparkSession, inputMap:Map[String, DataFrame]):DataFrame
+}
+
+/**
+  * SqlProcess
+  * @param conf
+  */
+class SqlProcess(conf:TaskConf) extends Process(conf) {
+  override def mandatoryOptions: Set[String] = Set("sql")
+
+  override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): 
DataFrame = {
+    // create temp table
+    inputMap.foreach { case (name, df) =>
+      logger.debug(s"${LOG_PREFIX} create temp table : $name")
+      df.printSchema()
+      df.createOrReplaceTempView(name)
+    }
+
+    val sql = conf.options("sql")
+    logger.debug(s"${LOG_PREFIX} sql : $sql")
+
+    ss.sql(sql)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
new file mode 100644
index 0000000..6249891
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -0,0 +1,165 @@
+package org.apache.s2graph.s2jobs.task
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger}
+import org.elasticsearch.spark.sql.EsSparkSQL
+
+/**
+  * Sink
+  * @param queryName
+  * @param conf
+  */
+abstract class Sink(queryName:String, override val conf:TaskConf) extends Task 
{
+  val DEFAULT_CHECKPOINT_LOCATION = s"/tmp/streamingjob/${queryName}"
+  val DEFAULT_TRIGGER_INTERVAL = "10 seconds"
+
+  val FORMAT:String
+
+  def preprocess(df:DataFrame):DataFrame = df
+
+  def write(inputDF: DataFrame):Unit = {
+    val df = repartition(preprocess(inputDF), 
inputDF.sparkSession.sparkContext.defaultParallelism)
+
+    if (inputDF.isStreaming) writeStream(df.writeStream)
+    else writeBatch(df.write)
+  }
+
+  protected def writeStream(writer: DataStreamWriter[Row]): Unit = {
+    val partitionsOpt = conf.options.get("partitions")
+    val mode = conf.options.getOrElse("mode", "append") match {
+      case "append" => OutputMode.Append()
+      case "update" => OutputMode.Update()
+      case "complete" => OutputMode.Complete()
+      case _ => logger.warn(s"${LOG_PREFIX} unsupported output mode. use 
default output mode 'append'")
+                OutputMode.Append()
+    }
+    val interval = conf.options.getOrElse("interval", DEFAULT_TRIGGER_INTERVAL)
+    val checkpointLocation = conf.options.getOrElse("checkpointLocation", 
DEFAULT_CHECKPOINT_LOCATION)
+
+    val cfg = conf.options ++ Map("checkpointLocation" -> checkpointLocation)
+
+    val partitionedWriter = if (partitionsOpt.isDefined) 
writer.partitionBy(partitionsOpt.get) else writer
+
+    val query = partitionedWriter
+      .queryName(queryName)
+      .format(FORMAT)
+      .options(cfg)
+      .trigger(Trigger.ProcessingTime(interval))
+      .outputMode(mode)
+      .start()
+
+    query.awaitTermination()
+  }
+
+  protected def writeBatch(writer: DataFrameWriter[Row]): Unit = {
+    val partitionsOpt = conf.options.get("partitions")
+    val mode = conf.options.getOrElse("mode", "overwrite") match {
+      case "overwrite" => SaveMode.Overwrite
+      case "append" => SaveMode.Append
+      case "errorIfExists" => SaveMode.ErrorIfExists
+      case "ignore" => SaveMode.Ignore
+      case _ => SaveMode.Overwrite
+    }
+
+    val partitionedWriter = if (partitionsOpt.isDefined) 
writer.partitionBy(partitionsOpt.get) else writer
+
+    writeBatchInner(partitionedWriter.format(FORMAT).mode(mode))
+  }
+
+  protected def writeBatchInner(writer: DataFrameWriter[Row]): Unit = {
+    val outputPath = conf.options("path")
+    writer.save(outputPath)
+  }
+
+  protected def repartition(df:DataFrame, defaultParallelism:Int) = {
+    conf.options.get("numPartitions").map(n => Integer.parseInt(n)) match {
+      case Some(numOfPartitions:Int) =>
+        if (numOfPartitions > defaultParallelism) 
df.repartition(numOfPartitions)
+        else df.coalesce(numOfPartitions)
+      case None => df
+    }
+  }
+}
+
+/**
+  * KafkaSink
+  * @param queryName
+  * @param conf
+  */
+class KafkaSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) 
{
+  override def mandatoryOptions: Set[String] = Set("kafka.bootstrap.servers", 
"topic")
+  override val FORMAT: String = "kafka"
+
+  override def preprocess(df:DataFrame):DataFrame = {
+    import org.apache.spark.sql.functions._
+
+    logger.debug(s"${LOG_PREFIX} schema: ${df.schema}")
+
+    conf.options.getOrElse("format", "json") match {
+      case "tsv" =>
+        val delimiter = conf.options.getOrElse("delimiter", "\t")
+
+        val columns = df.columns
+        df.select(concat_ws(delimiter, columns.map(c => col(c)): 
_*).alias("value"))
+      case format:String =>
+        if (format != "json") logger.warn(s"${LOG_PREFIX} unsupported format 
'$format'. use default json format")
+        df.selectExpr("to_json(struct(*)) AS value")
+    }
+  }
+
+  override protected def writeBatch(writer: DataFrameWriter[Row]): Unit =
+    throw new RuntimeException(s"unsupported source type for 
${this.getClass.getSimpleName} : ${conf.name}")
+}
+
+/**
+  * FileSink
+  * @param queryName
+  * @param conf
+  */
+class FileSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+  override def mandatoryOptions: Set[String] = Set("path", "format")
+  override val FORMAT: String = conf.options.getOrElse("format", "parquet")
+}
+
+/**
+  * HiveSink
+  * @param queryName
+  * @param conf
+  */
+class HiveSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+  override def mandatoryOptions: Set[String] = Set("database", "table")
+  override val FORMAT: String = "hive"
+
+  override protected def writeBatchInner(writer: DataFrameWriter[Row]): Unit = 
{
+    val database = conf.options("database")
+    val table = conf.options("table")
+
+    writer.insertInto(s"${database}.${table}")
+  }
+
+  override protected def writeStream(writer: DataStreamWriter[Row]): Unit =
+    throw new RuntimeException(s"unsupported source type for 
${this.getClass.getSimpleName} : ${conf.name}")
+
+}
+
+/**
+  * ESSink
+  * @param queryName
+  * @param conf
+  */
+class ESSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+  override def mandatoryOptions: Set[String] = Set("es.nodes", "path", 
"es.port")
+  override val FORMAT: String = "es"
+
+  override def write(inputDF: DataFrame): Unit = {
+    val df = repartition(inputDF, 
inputDF.sparkSession.sparkContext.defaultParallelism)
+
+    if (inputDF.isStreaming) writeStream(df.writeStream)
+    else {
+
+      val resource = conf.options("path")
+      EsSparkSQL.saveToEs(df, resource, conf.options)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
new file mode 100644
index 0000000..c81ac9f
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
@@ -0,0 +1,85 @@
+package org.apache.s2graph.s2jobs.task
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+
+/**
+  * Source
+  *
+  * @param conf
+  */
+abstract class Source(override val conf:TaskConf) extends Task {
+  def toDF(ss:SparkSession):DataFrame
+}
+
+class KafkaSource(conf:TaskConf) extends Source(conf) {
+  val DEFAULT_FORMAT = "raw"
+  override def mandatoryOptions: Set[String] = Set("kafka.bootstrap.servers", 
"subscribe")
+
+  override def toDF(ss:SparkSession):DataFrame = {
+    logger.info(s"${LOG_PREFIX} options: ${conf.options}")
+
+    val format = conf.options.getOrElse("format", "raw")
+    val df = ss.readStream.format("kafka").options(conf.options).load()
+
+    format match {
+      case "raw" => df
+      case "json" => parseJsonSchema(ss, df)
+//      case "custom" => parseCustomSchema(df)
+      case _ =>
+        logger.warn(s"${LOG_PREFIX} unsupported format '$format'.. use default 
schema ")
+        df
+    }
+  }
+
+  def parseJsonSchema(ss:SparkSession, df:DataFrame):DataFrame = {
+    import org.apache.spark.sql.functions.from_json
+    import org.apache.spark.sql.types.DataType
+    import ss.implicits._
+
+    val schemaOpt = conf.options.get("schema")
+    schemaOpt match {
+      case Some(schemaAsJson:String) =>
+        val dataType:DataType = DataType.fromJson(schemaAsJson)
+        logger.debug(s"${LOG_PREFIX} schema : ${dataType.sql}")
+
+        df.selectExpr("CAST(value AS STRING)")
+          .select(from_json('value, dataType) as 'struct)
+          .select("struct.*")
+
+      case None =>
+        logger.warn(s"${LOG_PREFIX} json format does not have schema.. use 
default schema ")
+        df
+    }
+  }
+}
+
+class FileSource(conf:TaskConf) extends Source(conf) {
+  val DEFAULT_FORMAT = "parquet"
+  override def mandatoryOptions: Set[String] = Set("paths")
+
+  override def toDF(ss: SparkSession): DataFrame = {
+    import org.apache.s2graph.s2jobs.Schema._
+    val paths = conf.options("paths").split(",")
+    val format = conf.options.getOrElse("format", DEFAULT_FORMAT)
+
+    format match {
+      case "edgeLog" =>
+        ss.read.format("com.databricks.spark.csv").option("delimiter", "\t")
+          .schema(BulkLoadSchema).load(paths: _*)
+      case _ => ss.read.format(format).load(paths: _*)
+    }
+  }
+}
+
+class HiveSource(conf:TaskConf) extends Source(conf) {
+  override def mandatoryOptions: Set[String] = Set("database", "table")
+
+  override def toDF(ss: SparkSession): DataFrame = {
+    val database = conf.options("database")
+    val table = conf.options("table")
+
+    val sql = conf.options.getOrElse("sql", s"SELECT * FROM 
${database}.${table}")
+    ss.sql(sql)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
new file mode 100644
index 0000000..bd9259f
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
@@ -0,0 +1,19 @@
+package org.apache.s2graph.s2jobs.task
+
+import org.apache.s2graph.s2jobs.Logger
+
+case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, 
options:Map[String, String] = Map.empty)
+
+trait Task extends Serializable with Logger {
+  val conf:TaskConf
+  val LOG_PREFIX = s"[${this.getClass.getSimpleName}]"
+
+  def mandatoryOptions:Set[String]
+  def isValidate:Boolean = mandatoryOptions.subsetOf(conf.options.keySet)
+
+  require(isValidate, s"""${LOG_PREFIX} not exists mandatory options 
'${mandatoryOptions.mkString(",")}'
+                          in task options 
(${conf.options.keySet.mkString(",")})
+                      """.stripMargin)
+
+  println(s"${LOG_PREFIX} init : $conf")
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala
new file mode 100644
index 0000000..adbebac
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala
@@ -0,0 +1,29 @@
+package org.apache.s2graph.s2jobs.task
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import org.scalatest.FunSuite
+
+class ProcessTest extends FunSuite with DataFrameSuiteBase {
+
+  test("SqlProcess execute sql") {
+    import spark.implicits._
+
+    val inputDF = Seq(
+      ("a", "b", "friend"),
+      ("a", "c", "friend"),
+      ("a", "d", "friend")
+    ).toDF("from", "to", "label")
+
+    val inputMap = Map("input" -> inputDF)
+    val sql = "SELECT * FROM input WHERE to = 'b'"
+    val conf = TaskConf("test", "sql", Seq("input"), Map("sql" -> sql))
+
+    val process = new SqlProcess(conf)
+
+    val rstDF = process.execute(spark, inputMap)
+    val tos = rstDF.collect().map{ row => row.getAs[String]("to")}
+
+    assert(tos.size == 1)
+    assert(tos.head == "b")
+  }
+}

Reply via email to