Repository: incubator-griffin Updated Branches: refs/heads/master db3e6ec0d -> 9ae78689c
enable the option for streaming job clear checkpoint directory before process Author: Lionel Liu <[email protected]> Closes #237 from bhlx3lyx7/tmst. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/9ae78689 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/9ae78689 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/9ae78689 Branch: refs/heads/master Commit: 9ae78689c6b49fce5faa51cad51ed2ab70f0b160 Parents: db3e6ec Author: Lionel Liu <[email protected]> Authored: Thu Mar 22 16:29:33 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Thu Mar 22 16:29:33 2018 +0800 ---------------------------------------------------------------------- .../measure/config/params/env/SparkParam.scala | 5 ++++- .../griffin/measure/process/StreamingDqProcess.scala | 15 ++++++++++++--- measure/src/test/resources/env-test.json | 3 ++- 3 files changed, 18 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9ae78689/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala index 6ec0955..a21a64f 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala @@ -27,7 +27,10 @@ case class SparkParam( @JsonProperty("log.level") logLevel: String, @JsonProperty("checkpoint.dir") cpDir: String, @JsonProperty("batch.interval") batchInterval: String, @JsonProperty("process.interval") processInterval: String, - @JsonProperty("config") config: Map[String, String] + @JsonProperty("config") config: Map[String, String], + @JsonProperty("init.clear") initClear: Boolean ) extends Param { + def needInitClear: Boolean = if (initClear != null) initClear else false + } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9ae78689/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala index 3c2376a..b2af46a 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala @@ -28,7 +28,7 @@ import org.apache.griffin.measure.process.engine.DqEngineFactory import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters} import org.apache.griffin.measure.rule.adaptor.RuleAdaptorGroup import org.apache.griffin.measure.rule.udf._ -import org.apache.griffin.measure.utils.TimeUtil +import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveContext import org.apache.spark.streaming.{Milliseconds, StreamingContext} @@ -58,6 +58,9 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess { sparkContext.setLogLevel(sparkParam.logLevel) sqlContext = new HiveContext(sparkContext) + // clear checkpoint directory + clearCpDir + // init info cache instance InfoCacheInstance.initInstance(envParam.infoCacheParams, metricName) InfoCacheInstance.init @@ -159,9 +162,15 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess { val ssc = new StreamingContext(sparkContext, batchInterval) ssc.checkpoint(sparkParam.cpDir) - - ssc } + private def clearCpDir: Unit = { + if (sparkParam.needInitClear) { + val cpDir = sparkParam.cpDir + println(s"clear checkpoint directory ${cpDir}") + HdfsUtil.deleteHdfsPath(cpDir) + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9ae78689/measure/src/test/resources/env-test.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/env-test.json b/measure/src/test/resources/env-test.json index 898d579..4a8e3d0 100644 --- a/measure/src/test/resources/env-test.json +++ b/measure/src/test/resources/env-test.json @@ -6,7 +6,8 @@ "process.interval": "10m", "config": { "spark.master": "local[*]" - } + }, + "init.clear": true }, "persist": [
