wait termination for multiple streaming query
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/a3227fd0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/a3227fd0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/a3227fd0 Branch: refs/heads/master Commit: a3227fd07e69c5b3dcc868e8c3b8ab43e5d0956e Parents: 33c6fd8 Author: Chul Kang <[email protected]> Authored: Tue Mar 27 21:28:20 2018 +0900 Committer: Chul Kang <[email protected]> Committed: Thu Mar 29 12:03:21 2018 +0900 ---------------------------------------------------------------------- s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala | 4 +++- .../main/scala/org/apache/s2graph/s2jobs/task/Sink.scala | 10 ++++------ 2 files changed, 7 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a3227fd0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala index b8fccc2..beef135 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala @@ -22,6 +22,7 @@ class Job(ss:SparkSession, jobDesc:JobDescription) extends Serializable with Log } while(processRst.nonEmpty) logger.debug(s"valid named DF set : ${dfMap.keySet}") + // sinks jobDesc.sinks.foreach { s => val inputDFs = s.conf.inputs.flatMap{ input => dfMap.get(input)} @@ -30,7 +31,8 @@ class Job(ss:SparkSession, jobDesc:JobDescription) extends Serializable with Log // use only first input s.write(inputDFs.head) } - + // if stream query exist + if (ss.streams.active.length > 0) ss.streams.awaitAnyTermination() } private def getValidProcess(processes:Seq[Process]):Seq[(String, DataFrame)] = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a3227fd0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index 0b83a6a..c15ed52 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -10,7 +10,7 @@ import org.elasticsearch.spark.sql.EsSparkSQL * @param conf */ abstract class Sink(queryName:String, override val conf:TaskConf) extends Task { - val DEFAULT_CHECKPOINT_LOCATION = s"/tmp/streamingjob/${queryName}" + val DEFAULT_CHECKPOINT_LOCATION = s"/tmp/streamingjob/${queryName}/${conf.name}" val DEFAULT_TRIGGER_INTERVAL = "10 seconds" val FORMAT:String @@ -38,17 +38,15 @@ abstract class Sink(queryName:String, override val conf:TaskConf) extends Task { val cfg = conf.options ++ Map("checkpointLocation" -> checkpointLocation) - val partitionedWriter = if (partitionsOpt.isDefined) writer.partitionBy(partitionsOpt.get) else writer + val partitionedWriter = if (partitionsOpt.isDefined) writer.partitionBy(partitionsOpt.get.split(","): _*) else writer - val query = partitionedWriter - .queryName(queryName) + partitionedWriter + .queryName(s"${queryName}_${conf.name}") .format(FORMAT) .options(cfg) .trigger(Trigger.ProcessingTime(interval)) .outputMode(mode) .start() - - query.awaitTermination() } protected def writeBatch(writer: DataFrameWriter[Row]): Unit = {
