fix bug

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/34b46f6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/34b46f6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/34b46f6e

Branch: refs/heads/master
Commit: 34b46f6e4da9da793c5055ce38ed3b1202174eca
Parents: 0ae2cb2
Author: Chul Kang <[email protected]>
Authored: Fri Jul 27 18:50:38 2018 +0900
Committer: Chul Kang <[email protected]>
Committed: Fri Jul 27 18:50:38 2018 +0900

----------------------------------------------------------------------
 s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/34b46f6e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
index 8f21bc2..026b688 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
@@ -35,7 +35,7 @@ class Job(ss:SparkSession, jobDesc:JobDescription) extends 
Serializable with Log
 
       dfMap.put(source.conf.name, df)
     }
-    logger.debug(s"valid source DF set : ${dfMap.keySet}")
+    logger.info(s"valid source DF set : ${dfMap.keySet}")
 
     // process
     var processRst:Seq[(String, DataFrame)] = Nil
@@ -45,7 +45,7 @@ class Job(ss:SparkSession, jobDesc:JobDescription) extends 
Serializable with Log
 
     } while(processRst.nonEmpty)
 
-    logger.debug(s"valid named DF set : ${dfMap.keySet}")
+    logger.info(s"valid named DF set : ${dfMap.keySet}")
 
     // sinks
     jobDesc.sinks.foreach { s =>
@@ -63,8 +63,7 @@ class Job(ss:SparkSession, jobDesc:JobDescription) extends 
Serializable with Log
     val dfKeys = dfMap.keySet
 
     processes.filter{ p =>
-        var existAllInput = true
-        p.conf.inputs.foreach { input => existAllInput = dfKeys(input) }
+        val existAllInput = p.conf.inputs.forall{ input => dfKeys(input) }
         !dfKeys(p.conf.name) && existAllInput
     }
     .map { p =>

Reply via email to