add dataframe cache option
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/d93e5de4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/d93e5de4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/d93e5de4 Branch: refs/heads/master Commit: d93e5de4a89d5fbf195acb09d467b103f564a439 Parents: 63727f3 Author: Chul Kang <[email protected]> Authored: Fri Jun 8 00:07:50 2018 +0900 Committer: Chul Kang <[email protected]> Committed: Fri Jun 8 00:07:50 2018 +0900 ---------------------------------------------------------------------- .../src/main/scala/org/apache/s2graph/s2jobs/Job.scala | 11 +++++++++-- .../main/scala/org/apache/s2graph/s2jobs/task/Task.scala | 3 +-- 2 files changed, 10 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d93e5de4/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala index 717d087..8f21bc2 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala @@ -29,7 +29,12 @@ class Job(ss:SparkSession, jobDesc:JobDescription) extends Serializable with Log def run() = { // source - jobDesc.sources.foreach{ source => dfMap.put(source.conf.name, source.toDF(ss))} + jobDesc.sources.foreach{ source => + val df = source.toDF(ss) + if (source.conf.cache.getOrElse(false) && !df.isStreaming) df.cache() + + dfMap.put(source.conf.name, df) + } logger.debug(s"valid source DF set : ${dfMap.keySet}") // process @@ -64,7 +69,9 @@ class Job(ss:SparkSession, jobDesc:JobDescription) extends Serializable with Log } .map { p => val inputMap = p.conf.inputs.map{ input => (input, dfMap(input)) }.toMap - p.conf.name -> p.execute(ss, inputMap) + val df = p.execute(ss, inputMap) + if (p.conf.cache.getOrElse(false) && !df.isStreaming) df.cache() + p.conf.name -> df } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d93e5de4/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala index ea42828..1210132 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala @@ -42,8 +42,7 @@ object TaskConf { taskConf.options.filterKeys(_.startsWith("cache.")).mapValues(_.toInt) } } - -case class TaskConf(name: String, `type`: String, inputs: Seq[String] = Nil, options: Map[String, String] = Map.empty) +case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty, cache:Option[Boolean]=None) trait Task extends Serializable with Logger { val conf: TaskConf
