Repository: incubator-griffin
Updated Branches:
  refs/heads/master db3e6ec0d -> 9ae78689c


enable the option for streaming job clear checkpoint directory before process

Author: Lionel Liu <[email protected]>

Closes #237 from bhlx3lyx7/tmst.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/9ae78689
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/9ae78689
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/9ae78689

Branch: refs/heads/master
Commit: 9ae78689c6b49fce5faa51cad51ed2ab70f0b160
Parents: db3e6ec
Author: Lionel Liu <[email protected]>
Authored: Thu Mar 22 16:29:33 2018 +0800
Committer: Lionel Liu <[email protected]>
Committed: Thu Mar 22 16:29:33 2018 +0800

----------------------------------------------------------------------
 .../measure/config/params/env/SparkParam.scala       |  5 ++++-
 .../griffin/measure/process/StreamingDqProcess.scala | 15 ++++++++++++---
 measure/src/test/resources/env-test.json             |  3 ++-
 3 files changed, 18 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9ae78689/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala
index 6ec0955..a21a64f 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala
@@ -27,7 +27,10 @@ case class SparkParam( @JsonProperty("log.level") logLevel: 
String,
                        @JsonProperty("checkpoint.dir") cpDir: String,
                        @JsonProperty("batch.interval") batchInterval: String,
                        @JsonProperty("process.interval") processInterval: 
String,
-                       @JsonProperty("config") config: Map[String, String]
+                       @JsonProperty("config") config: Map[String, String],
+                       @JsonProperty("init.clear") initClear: Boolean
                      ) extends Param {
 
+  def needInitClear: Boolean = if (initClear != null) initClear else false
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9ae78689/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
index 3c2376a..b2af46a 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
@@ -28,7 +28,7 @@ import 
org.apache.griffin.measure.process.engine.DqEngineFactory
 import org.apache.griffin.measure.process.temp.{DataFrameCaches, 
TableRegisters}
 import org.apache.griffin.measure.rule.adaptor.RuleAdaptorGroup
 import org.apache.griffin.measure.rule.udf._
-import org.apache.griffin.measure.utils.TimeUtil
+import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.streaming.{Milliseconds, StreamingContext}
@@ -58,6 +58,9 @@ case class StreamingDqProcess(allParam: AllParam) extends 
DqProcess {
     sparkContext.setLogLevel(sparkParam.logLevel)
     sqlContext = new HiveContext(sparkContext)
 
+    // clear checkpoint directory
+    clearCpDir
+
     // init info cache instance
     InfoCacheInstance.initInstance(envParam.infoCacheParams, metricName)
     InfoCacheInstance.init
@@ -159,9 +162,15 @@ case class StreamingDqProcess(allParam: AllParam) extends 
DqProcess {
     val ssc = new StreamingContext(sparkContext, batchInterval)
     ssc.checkpoint(sparkParam.cpDir)
 
-
-
     ssc
   }
 
+  private def clearCpDir: Unit = {
+    if (sparkParam.needInitClear) {
+      val cpDir = sparkParam.cpDir
+      println(s"clear checkpoint directory ${cpDir}")
+      HdfsUtil.deleteHdfsPath(cpDir)
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9ae78689/measure/src/test/resources/env-test.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/env-test.json 
b/measure/src/test/resources/env-test.json
index 898d579..4a8e3d0 100644
--- a/measure/src/test/resources/env-test.json
+++ b/measure/src/test/resources/env-test.json
@@ -6,7 +6,8 @@
     "process.interval": "10m",
     "config": {
       "spark.master": "local[*]"
-    }
+    },
+    "init.clear": true
   },
 
   "persist": [

Reply via email to