Repository: incubator-s2graph Updated Branches: refs/heads/master 653deee07 -> 245335ad3
add simple etl job interface Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e0ba0aa7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e0ba0aa7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e0ba0aa7 Branch: refs/heads/master Commit: e0ba0aa74f05a0f57bfa17ac87f898fd3456a790 Parents: 2ce86fa Author: Chul Kang <[email protected]> Authored: Thu Mar 22 16:58:18 2018 +0900 Committer: Chul Kang <[email protected]> Committed: Thu Mar 29 12:02:32 2018 +0900 ---------------------------------------------------------------------- project/Common.scala | 2 +- s2jobs/build.sbt | 4 +- .../scala/org/apache/s2graph/s2jobs/Job.scala | 49 ++++++ .../apache/s2graph/s2jobs/JobDescription.scala | 69 ++++++++ .../org/apache/s2graph/s2jobs/JobLauncher.scala | 69 ++++++++ .../org/apache/s2graph/s2jobs/Logger.scala | 7 + .../org/apache/s2graph/s2jobs/Schema.scala | 16 ++ .../apache/s2graph/s2jobs/task/Process.scala | 34 ++++ .../org/apache/s2graph/s2jobs/task/Sink.scala | 165 +++++++++++++++++++ .../org/apache/s2graph/s2jobs/task/Source.scala | 85 ++++++++++ .../org/apache/s2graph/s2jobs/task/Task.scala | 19 +++ .../s2graph/s2jobs/task/ProcessTest.scala | 29 ++++ 12 files changed, 546 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/project/Common.scala ---------------------------------------------------------------------- diff --git a/project/Common.scala b/project/Common.scala index e2323a2..f187e07 100644 --- a/project/Common.scala +++ b/project/Common.scala @@ -29,7 +29,7 @@ object Common { val hadoopVersion = "2.7.3" val tinkerpopVersion = "3.2.5" - val elastic4sVersion = "6.1.0" + val elastic4sVersion = "6.1.1" /** use Log4j 1.2.17 as the SLF4j backend in runtime, with bridging libraries to forward JCL and JUL logs to SLF4j */ val loggingRuntime = Seq( http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/build.sbt ---------------------------------------------------------------------- diff --git a/s2jobs/build.sbt b/s2jobs/build.sbt index b915238..02ed48d 100644 --- a/s2jobs/build.sbt +++ b/s2jobs/build.sbt @@ -35,7 +35,9 @@ libraryDependencies ++= Seq( "org.specs2" %% "specs2-core" % specs2Version % "test", "org.scalatest" %% "scalatest" % "2.2.1" % "test", "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion, - "com.github.scopt" %% "scopt" % "3.7.0" + "org.elasticsearch" % "elasticsearch-spark-20_2.11" % elastic4sVersion, + "com.github.scopt" %% "scopt" % "3.7.0", + "com.holdenkarau" %% "spark-testing-base" % "2.3.0_0.9.0" % Test ) crossScalaVersions := Seq("2.10.6") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala new file mode 100644 index 0000000..b8fccc2 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala @@ -0,0 +1,49 @@ +package org.apache.s2graph.s2jobs + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.s2graph.s2jobs.task._ + +import scala.collection.mutable + +class Job(ss:SparkSession, jobDesc:JobDescription) extends Serializable with Logger { + private val dfMap = mutable.Map[String, DataFrame]() + + def run() = { + // source + jobDesc.sources.foreach{ source => dfMap.put(source.conf.name, source.toDF(ss))} + logger.debug(s"valid source DF set : ${dfMap.keySet}") + + // process + var processRst:Seq[(String, DataFrame)] = Nil + do { + processRst = getValidProcess(jobDesc.processes) + processRst.foreach { case (name, df) => dfMap.put(name, df)} + + } while(processRst.nonEmpty) + + logger.debug(s"valid named DF set : ${dfMap.keySet}") + // sinks + jobDesc.sinks.foreach { s => + val inputDFs = s.conf.inputs.flatMap{ input => dfMap.get(input)} + if (inputDFs.isEmpty) throw new IllegalArgumentException(s"sink has not valid inputs (${s.conf.name})") + + // use only first input + s.write(inputDFs.head) + } + + } + + private def getValidProcess(processes:Seq[Process]):Seq[(String, DataFrame)] = { + val dfKeys = dfMap.keySet + + processes.filter{ p => + var existAllInput = true + p.conf.inputs.foreach { input => existAllInput = dfKeys(input) } + !dfKeys(p.conf.name) && existAllInput + } + .map { p => + val inputMap = p.conf.inputs.map{ input => (input, dfMap(input)) }.toMap + p.conf.name -> p.execute(ss, inputMap) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala new file mode 100644 index 0000000..0a9194f --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala @@ -0,0 +1,69 @@ +package org.apache.s2graph.s2jobs + +import play.api.libs.json.{JsValue, Json} +import org.apache.s2graph.s2jobs.task._ + +case class JobDescription( + name:String, + sources:Seq[Source], + processes:Seq[task.Process], + sinks:Seq[Sink] + ) + +object JobDescription extends Logger { + val dummy = JobDescription("dummy", Nil, Nil, Nil) + + def apply(jsVal:JsValue):JobDescription = { + implicit val TaskConfReader = Json.reads[TaskConf] + + logger.debug(s"JobDescription: ${jsVal}") + + val jobName = (jsVal \ "name").as[String] + val sources = (jsVal \ "source").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getSource(conf)) + val processes = (jsVal \ "process").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getProcess(conf)) + val sinks = (jsVal \ "sink").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getSink(jobName, conf)) + + JobDescription(jobName, sources, processes, sinks) + } + + def getSource(conf:TaskConf):Source = { + conf.`type` match { + case "kafka" => new KafkaSource(conf) + case "file" => new FileSource(conf) + case "hive" => new HiveSource(conf) + case _ => throw new IllegalArgumentException(s"unsupported source type : ${conf.`type`}") + } + } + + def getProcess(conf:TaskConf): task.Process = { + conf.`type` match { + case "sql" => new SqlProcess(conf) + case "custom" => + val customClassOpt = conf.options.get("class") + customClassOpt match { + case Some(customClass:String) => + logger.debug(s"custom class init.. $customClass") + + Class.forName(customClass) + .getConstructor(TaskConf.getClass) + .newInstance(conf) + .asInstanceOf[task.Process] + + case None => throw new IllegalArgumentException(s"custom class name is not exist.. ${conf}") + } + + case _ => throw new IllegalArgumentException(s"unsupported process type : ${conf.`type`}") + } + } + + def getSink(jobName:String, conf:TaskConf):Sink = { + conf.`type` match { + case "kafka" => new KafkaSink(jobName, conf) + case "file" => new FileSink(jobName, conf) + case "es" => new ESSink(jobName, conf) + case "s2graph" => new S2graphSink(jobName, conf) + case _ => throw new IllegalArgumentException(s"unsupported sink type : ${conf.`type`}") + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala new file mode 100644 index 0000000..d0b355c --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala @@ -0,0 +1,69 @@ +package org.apache.s2graph.s2jobs + +import org.apache.spark.sql.SparkSession +import play.api.libs.json.{JsValue, Json} + +import scala.io.Source + +case class JobOption( + name:String = "S2BatchJob", + confType:String = "db", + jobId:Int = -1, + confFile:String = "" + ) + +object JobLauncher extends Logger { + + def parseArguments(args: Array[String]): JobOption = { + val parser = new scopt.OptionParser[JobOption]("run") { + opt[String]('n', "name").required().action((x, c) => + c.copy(name = x)).text("job display name") + + cmd("file").action((_, c) => c.copy(confType = "file")) + .text("get config from file") + .children( + opt[String]('f', "confFile").required().valueName("<file>").action((x, c) => + c.copy(confFile = x)).text("configuration file") + ) + + cmd("db").action((_, c) => c.copy(confType = "db")) + .text("get config from db") + .children( + opt[String]('i', "jobId").required().valueName("<jobId>").action((x, c) => + c.copy(jobId = x.toInt)).text("configuration file") + ) + } + + parser.parse(args, JobOption()) match { + case Some(o) => o + case None => + parser.showUsage() + throw new IllegalArgumentException(s"failed to parse options... (${args.mkString(",")}") + } + } + + def getConfig(options: JobOption):JsValue = options.confType match { + case "file" => + Json.parse(Source.fromFile(options.confFile).mkString) + case "db" => + throw new IllegalArgumentException(s"'db' option that read config file from database is not supported yet.. ") + } + + def main(args: Array[String]): Unit = { + + val options = parseArguments(args) + logger.info(s"Job Options : ${options}") + + val jobDescription = JobDescription(getConfig(options)) + + val ss = SparkSession + .builder() + .appName(s"${jobDescription.name}") + .config("spark.driver.maxResultSize", "20g") + .enableHiveSupport() + .getOrCreate() + + val job = new Job(ss, jobDescription) + job.run() + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Logger.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Logger.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Logger.scala new file mode 100644 index 0000000..1df55f0 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Logger.scala @@ -0,0 +1,7 @@ +package org.apache.s2graph.s2jobs + +import org.apache.commons.logging.{Log, LogFactory} + +trait Logger { + @transient lazy val logger: Log = LogFactory.getLog(this.getClass) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala new file mode 100644 index 0000000..f214218 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala @@ -0,0 +1,16 @@ +package org.apache.s2graph.s2jobs + +import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} + +object Schema { + val BulkLoadSchema = StructType(Seq( + StructField("timestamp", LongType, false), + StructField("operation", StringType, false), + StructField("elem", StringType, false), + StructField("from", StringType, false), + StructField("to", StringType, false), + StructField("label", StringType, false), + StructField("props", StringType, false), + StructField("direction", StringType, true) + )) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala new file mode 100644 index 0000000..73d2fab --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala @@ -0,0 +1,34 @@ +package org.apache.s2graph.s2jobs.task + +import org.apache.spark.sql.{DataFrame, SparkSession} + +/** + * Process + * @param conf + */ +abstract class Process(override val conf:TaskConf) extends Task { + def execute(ss:SparkSession, inputMap:Map[String, DataFrame]):DataFrame +} + +/** + * SqlProcess + * @param conf + */ +class SqlProcess(conf:TaskConf) extends Process(conf) { + override def mandatoryOptions: Set[String] = Set("sql") + + override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = { + // create temp table + inputMap.foreach { case (name, df) => + logger.debug(s"${LOG_PREFIX} create temp table : $name") + df.printSchema() + df.createOrReplaceTempView(name) + } + + val sql = conf.options("sql") + logger.debug(s"${LOG_PREFIX} sql : $sql") + + ss.sql(sql) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala new file mode 100644 index 0000000..6249891 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -0,0 +1,165 @@ +package org.apache.s2graph.s2jobs.task + +import org.apache.spark.sql._ +import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger} +import org.elasticsearch.spark.sql.EsSparkSQL + +/** + * Sink + * @param queryName + * @param conf + */ +abstract class Sink(queryName:String, override val conf:TaskConf) extends Task { + val DEFAULT_CHECKPOINT_LOCATION = s"/tmp/streamingjob/${queryName}" + val DEFAULT_TRIGGER_INTERVAL = "10 seconds" + + val FORMAT:String + + def preprocess(df:DataFrame):DataFrame = df + + def write(inputDF: DataFrame):Unit = { + val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism) + + if (inputDF.isStreaming) writeStream(df.writeStream) + else writeBatch(df.write) + } + + protected def writeStream(writer: DataStreamWriter[Row]): Unit = { + val partitionsOpt = conf.options.get("partitions") + val mode = conf.options.getOrElse("mode", "append") match { + case "append" => OutputMode.Append() + case "update" => OutputMode.Update() + case "complete" => OutputMode.Complete() + case _ => logger.warn(s"${LOG_PREFIX} unsupported output mode. use default output mode 'append'") + OutputMode.Append() + } + val interval = conf.options.getOrElse("interval", DEFAULT_TRIGGER_INTERVAL) + val checkpointLocation = conf.options.getOrElse("checkpointLocation", DEFAULT_CHECKPOINT_LOCATION) + + val cfg = conf.options ++ Map("checkpointLocation" -> checkpointLocation) + + val partitionedWriter = if (partitionsOpt.isDefined) writer.partitionBy(partitionsOpt.get) else writer + + val query = partitionedWriter + .queryName(queryName) + .format(FORMAT) + .options(cfg) + .trigger(Trigger.ProcessingTime(interval)) + .outputMode(mode) + .start() + + query.awaitTermination() + } + + protected def writeBatch(writer: DataFrameWriter[Row]): Unit = { + val partitionsOpt = conf.options.get("partitions") + val mode = conf.options.getOrElse("mode", "overwrite") match { + case "overwrite" => SaveMode.Overwrite + case "append" => SaveMode.Append + case "errorIfExists" => SaveMode.ErrorIfExists + case "ignore" => SaveMode.Ignore + case _ => SaveMode.Overwrite + } + + val partitionedWriter = if (partitionsOpt.isDefined) writer.partitionBy(partitionsOpt.get) else writer + + writeBatchInner(partitionedWriter.format(FORMAT).mode(mode)) + } + + protected def writeBatchInner(writer: DataFrameWriter[Row]): Unit = { + val outputPath = conf.options("path") + writer.save(outputPath) + } + + protected def repartition(df:DataFrame, defaultParallelism:Int) = { + conf.options.get("numPartitions").map(n => Integer.parseInt(n)) match { + case Some(numOfPartitions:Int) => + if (numOfPartitions > defaultParallelism) df.repartition(numOfPartitions) + else df.coalesce(numOfPartitions) + case None => df + } + } +} + +/** + * KafkaSink + * @param queryName + * @param conf + */ +class KafkaSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { + override def mandatoryOptions: Set[String] = Set("kafka.bootstrap.servers", "topic") + override val FORMAT: String = "kafka" + + override def preprocess(df:DataFrame):DataFrame = { + import org.apache.spark.sql.functions._ + + logger.debug(s"${LOG_PREFIX} schema: ${df.schema}") + + conf.options.getOrElse("format", "json") match { + case "tsv" => + val delimiter = conf.options.getOrElse("delimiter", "\t") + + val columns = df.columns + df.select(concat_ws(delimiter, columns.map(c => col(c)): _*).alias("value")) + case format:String => + if (format != "json") logger.warn(s"${LOG_PREFIX} unsupported format '$format'. use default json format") + df.selectExpr("to_json(struct(*)) AS value") + } + } + + override protected def writeBatch(writer: DataFrameWriter[Row]): Unit = + throw new RuntimeException(s"unsupported source type for ${this.getClass.getSimpleName} : ${conf.name}") +} + +/** + * FileSink + * @param queryName + * @param conf + */ +class FileSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { + override def mandatoryOptions: Set[String] = Set("path", "format") + override val FORMAT: String = conf.options.getOrElse("format", "parquet") +} + +/** + * HiveSink + * @param queryName + * @param conf + */ +class HiveSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { + override def mandatoryOptions: Set[String] = Set("database", "table") + override val FORMAT: String = "hive" + + override protected def writeBatchInner(writer: DataFrameWriter[Row]): Unit = { + val database = conf.options("database") + val table = conf.options("table") + + writer.insertInto(s"${database}.${table}") + } + + override protected def writeStream(writer: DataStreamWriter[Row]): Unit = + throw new RuntimeException(s"unsupported source type for ${this.getClass.getSimpleName} : ${conf.name}") + +} + +/** + * ESSink + * @param queryName + * @param conf + */ +class ESSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { + override def mandatoryOptions: Set[String] = Set("es.nodes", "path", "es.port") + override val FORMAT: String = "es" + + override def write(inputDF: DataFrame): Unit = { + val df = repartition(inputDF, inputDF.sparkSession.sparkContext.defaultParallelism) + + if (inputDF.isStreaming) writeStream(df.writeStream) + else { + + val resource = conf.options("path") + EsSparkSQL.saveToEs(df, resource, conf.options) + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala new file mode 100644 index 0000000..c81ac9f --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala @@ -0,0 +1,85 @@ +package org.apache.s2graph.s2jobs.task + +import org.apache.spark.sql.{DataFrame, SparkSession} + + +/** + * Source + * + * @param conf + */ +abstract class Source(override val conf:TaskConf) extends Task { + def toDF(ss:SparkSession):DataFrame +} + +class KafkaSource(conf:TaskConf) extends Source(conf) { + val DEFAULT_FORMAT = "raw" + override def mandatoryOptions: Set[String] = Set("kafka.bootstrap.servers", "subscribe") + + override def toDF(ss:SparkSession):DataFrame = { + logger.info(s"${LOG_PREFIX} options: ${conf.options}") + + val format = conf.options.getOrElse("format", "raw") + val df = ss.readStream.format("kafka").options(conf.options).load() + + format match { + case "raw" => df + case "json" => parseJsonSchema(ss, df) +// case "custom" => parseCustomSchema(df) + case _ => + logger.warn(s"${LOG_PREFIX} unsupported format '$format'.. use default schema ") + df + } + } + + def parseJsonSchema(ss:SparkSession, df:DataFrame):DataFrame = { + import org.apache.spark.sql.functions.from_json + import org.apache.spark.sql.types.DataType + import ss.implicits._ + + val schemaOpt = conf.options.get("schema") + schemaOpt match { + case Some(schemaAsJson:String) => + val dataType:DataType = DataType.fromJson(schemaAsJson) + logger.debug(s"${LOG_PREFIX} schema : ${dataType.sql}") + + df.selectExpr("CAST(value AS STRING)") + .select(from_json('value, dataType) as 'struct) + .select("struct.*") + + case None => + logger.warn(s"${LOG_PREFIX} json format does not have schema.. use default schema ") + df + } + } +} + +class FileSource(conf:TaskConf) extends Source(conf) { + val DEFAULT_FORMAT = "parquet" + override def mandatoryOptions: Set[String] = Set("paths") + + override def toDF(ss: SparkSession): DataFrame = { + import org.apache.s2graph.s2jobs.Schema._ + val paths = conf.options("paths").split(",") + val format = conf.options.getOrElse("format", DEFAULT_FORMAT) + + format match { + case "edgeLog" => + ss.read.format("com.databricks.spark.csv").option("delimiter", "\t") + .schema(BulkLoadSchema).load(paths: _*) + case _ => ss.read.format(format).load(paths: _*) + } + } +} + +class HiveSource(conf:TaskConf) extends Source(conf) { + override def mandatoryOptions: Set[String] = Set("database", "table") + + override def toDF(ss: SparkSession): DataFrame = { + val database = conf.options("database") + val table = conf.options("table") + + val sql = conf.options.getOrElse("sql", s"SELECT * FROM ${database}.${table}") + ss.sql(sql) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala new file mode 100644 index 0000000..bd9259f --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala @@ -0,0 +1,19 @@ +package org.apache.s2graph.s2jobs.task + +import org.apache.s2graph.s2jobs.Logger + +case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty) + +trait Task extends Serializable with Logger { + val conf:TaskConf + val LOG_PREFIX = s"[${this.getClass.getSimpleName}]" + + def mandatoryOptions:Set[String] + def isValidate:Boolean = mandatoryOptions.subsetOf(conf.options.keySet) + + require(isValidate, s"""${LOG_PREFIX} not exists mandatory options '${mandatoryOptions.mkString(",")}' + in task options (${conf.options.keySet.mkString(",")}) + """.stripMargin) + + println(s"${LOG_PREFIX} init : $conf") +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e0ba0aa7/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala new file mode 100644 index 0000000..adbebac --- /dev/null +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala @@ -0,0 +1,29 @@ +package org.apache.s2graph.s2jobs.task + +import com.holdenkarau.spark.testing.DataFrameSuiteBase +import org.scalatest.FunSuite + +class ProcessTest extends FunSuite with DataFrameSuiteBase { + + test("SqlProcess execute sql") { + import spark.implicits._ + + val inputDF = Seq( + ("a", "b", "friend"), + ("a", "c", "friend"), + ("a", "d", "friend") + ).toDF("from", "to", "label") + + val inputMap = Map("input" -> inputDF) + val sql = "SELECT * FROM input WHERE to = 'b'" + val conf = TaskConf("test", "sql", Seq("input"), Map("sql" -> sql)) + + val process = new SqlProcess(conf) + + val rstDF = process.execute(spark, inputMap) + val tos = rstDF.collect().map{ row => row.getAs[String]("to")} + + assert(tos.size == 1) + assert(tos.head == "b") + } +}
