fix bug
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/34b46f6e Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/34b46f6e Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/34b46f6e Branch: refs/heads/master Commit: 34b46f6e4da9da793c5055ce38ed3b1202174eca Parents: 0ae2cb2 Author: Chul Kang <[email protected]> Authored: Fri Jul 27 18:50:38 2018 +0900 Committer: Chul Kang <[email protected]> Committed: Fri Jul 27 18:50:38 2018 +0900 ---------------------------------------------------------------------- s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/34b46f6e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala index 8f21bc2..026b688 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala @@ -35,7 +35,7 @@ class Job(ss:SparkSession, jobDesc:JobDescription) extends Serializable with Log dfMap.put(source.conf.name, df) } - logger.debug(s"valid source DF set : ${dfMap.keySet}") + logger.info(s"valid source DF set : ${dfMap.keySet}") // process var processRst:Seq[(String, DataFrame)] = Nil @@ -45,7 +45,7 @@ class Job(ss:SparkSession, jobDesc:JobDescription) extends Serializable with Log } while(processRst.nonEmpty) - logger.debug(s"valid named DF set : ${dfMap.keySet}") + logger.info(s"valid named DF set : ${dfMap.keySet}") // sinks jobDesc.sinks.foreach { s => @@ -63,8 +63,7 @@ class Job(ss:SparkSession, jobDesc:JobDescription) extends Serializable with Log val dfKeys = dfMap.keySet processes.filter{ p => - var existAllInput = true - p.conf.inputs.foreach { input => existAllInput = dfKeys(input) } + val existAllInput = p.conf.inputs.forall{ input => dfKeys(input) } !dfKeys(p.conf.name) && existAllInput } .map { p =>
