wait termination for multiple streaming query

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/a3227fd0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/a3227fd0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/a3227fd0

Branch: refs/heads/master
Commit: a3227fd07e69c5b3dcc868e8c3b8ab43e5d0956e
Parents: 33c6fd8
Author: Chul Kang <[email protected]>
Authored: Tue Mar 27 21:28:20 2018 +0900
Committer: Chul Kang <[email protected]>
Committed: Thu Mar 29 12:03:21 2018 +0900

----------------------------------------------------------------------
 s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala |  4 +++-
 .../main/scala/org/apache/s2graph/s2jobs/task/Sink.scala  | 10 ++++------
 2 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a3227fd0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
index b8fccc2..beef135 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
@@ -22,6 +22,7 @@ class Job(ss:SparkSession, jobDesc:JobDescription) extends 
Serializable with Log
     } while(processRst.nonEmpty)
 
     logger.debug(s"valid named DF set : ${dfMap.keySet}")
+
     // sinks
     jobDesc.sinks.foreach { s =>
       val inputDFs = s.conf.inputs.flatMap{ input => dfMap.get(input)}
@@ -30,7 +31,8 @@ class Job(ss:SparkSession, jobDesc:JobDescription) extends 
Serializable with Log
       // use only first input
       s.write(inputDFs.head)
     }
-
+    // if stream query exist
+    if (ss.streams.active.length > 0) ss.streams.awaitAnyTermination()
   }
 
   private def getValidProcess(processes:Seq[Process]):Seq[(String, DataFrame)] 
= {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a3227fd0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 0b83a6a..c15ed52 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -10,7 +10,7 @@ import org.elasticsearch.spark.sql.EsSparkSQL
   * @param conf
   */
 abstract class Sink(queryName:String, override val conf:TaskConf) extends Task 
{
-  val DEFAULT_CHECKPOINT_LOCATION = s"/tmp/streamingjob/${queryName}"
+  val DEFAULT_CHECKPOINT_LOCATION = 
s"/tmp/streamingjob/${queryName}/${conf.name}"
   val DEFAULT_TRIGGER_INTERVAL = "10 seconds"
 
   val FORMAT:String
@@ -38,17 +38,15 @@ abstract class Sink(queryName:String, override val 
conf:TaskConf) extends Task {
 
     val cfg = conf.options ++ Map("checkpointLocation" -> checkpointLocation)
 
-    val partitionedWriter = if (partitionsOpt.isDefined) 
writer.partitionBy(partitionsOpt.get) else writer
+    val partitionedWriter = if (partitionsOpt.isDefined) 
writer.partitionBy(partitionsOpt.get.split(","): _*) else writer
 
-    val query = partitionedWriter
-      .queryName(queryName)
+    partitionedWriter
+      .queryName(s"${queryName}_${conf.name}")
       .format(FORMAT)
       .options(cfg)
       .trigger(Trigger.ProcessingTime(interval))
       .outputMode(mode)
       .start()
-
-    query.awaitTermination()
   }
 
   protected def writeBatch(writer: DataFrameWriter[Row]): Unit = {

Reply via email to