Repository: incubator-griffin Updated Branches: refs/heads/master cbff5b45c -> 06f969fea
Fix bug of record persist in simple mode the timestamp of records should be default timestamp in export config, not the calculate timestamp Author: Lionel Liu <[email protected]> Closes #192 from bhlx3lyx7/tmst. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/06f969fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/06f969fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/06f969fe Branch: refs/heads/master Commit: 06f969feac9d0016dba0caed0b913e395ef06559 Parents: cbff5b4 Author: Lionel Liu <[email protected]> Authored: Tue Jan 16 16:57:49 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Tue Jan 16 16:57:49 2018 +0800 ---------------------------------------------------------------------- .../griffin/measure/process/BatchDqProcess.scala | 4 ++-- .../griffin/measure/process/StreamingDqThread.scala | 4 ++-- .../griffin/measure/process/engine/DqEngine.scala | 2 +- .../griffin/measure/process/engine/DqEngines.scala | 16 ++++++++-------- .../measure/process/engine/SparkDqEngine.scala | 3 +-- 5 files changed, 14 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/06f969fe/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala index 950cd27..44cca9a 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala @@ -116,9 +116,9 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess { dqEngines.runRuleSteps(calcTimeInfo, rulePlan.ruleSteps) // persist results - dqEngines.persistAllMetrics(calcTimeInfo, rulePlan.metricExports, persistFactory) + dqEngines.persistAllMetrics(rulePlan.metricExports, persistFactory) - dqEngines.persistAllRecords(calcTimeInfo, rulePlan.recordExports, persistFactory, dataSources) + dqEngines.persistAllRecords(rulePlan.recordExports, persistFactory, dataSources) // dfs.foreach(_._2.cache()) // // dqEngines.persistAllRecords(dfs, persistFactory) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/06f969fe/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala index fcf9528..c3c4f09 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala @@ -85,7 +85,7 @@ case class StreamingDqThread(sqlContext: SQLContext, // persist results // val timeGroups = dqEngines.persistAllMetrics(ruleSteps, persistFactory) - dqEngines.persistAllMetrics(calcTimeInfo, optRulePlan.metricExports, persistFactory) + dqEngines.persistAllMetrics(optRulePlan.metricExports, persistFactory) // println(s"--- timeGroups: ${timeGroups}") val rt = new Date().getTime @@ -93,7 +93,7 @@ case class StreamingDqThread(sqlContext: SQLContext, appPersist.log(rt, persistResultTimeStr) // persist records - dqEngines.persistAllRecords(calcTimeInfo, optRulePlan.recordExports, persistFactory, dataSources) + dqEngines.persistAllRecords(optRulePlan.recordExports, persistFactory, dataSources) val et = new Date().getTime val persistTimeStr = s"persist records using time: ${et - rt} ms" http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/06f969fe/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala index 00c6ef4..ee3a65e 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala @@ -34,7 +34,7 @@ trait DqEngine extends Loggable with Serializable { protected def collectable(): Boolean = false - def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport): Map[Long, Map[String, Any]] + def collectMetrics(metricExport: MetricExport): Map[Long, Map[String, Any]] // def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] // http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/06f969fe/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala index 2163925..8f17764 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala @@ -54,11 +54,11 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { } } - def persistAllMetrics(timeInfo: TimeInfo, metricExports: Seq[MetricExport], persistFactory: PersistFactory + def persistAllMetrics(metricExports: Seq[MetricExport], persistFactory: PersistFactory ): Unit = { val allMetrics: Map[Long, Map[String, Any]] = { metricExports.foldLeft(Map[Long, Map[String, Any]]()) { (ret, metricExport) => - val metrics = collectMetrics(timeInfo, metricExport) + val metrics = collectMetrics(metricExport) metrics.foldLeft(ret) { (total, pair) => val (k, v) = pair total.get(k) match { @@ -112,7 +112,7 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { Await.result(pro.future, Duration.Inf) } - def persistAllRecords(timeInfo: TimeInfo, recordExports: Seq[RecordExport], + def persistAllRecords(recordExports: Seq[RecordExport], persistFactory: PersistFactory, dataSources: Seq[DataSource] ): Unit = { // method 1: multi thread persist multi data frame @@ -127,7 +127,7 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { recordExport.mode match { case SimpleMode => { collectBatchRecords(recordExport).foreach { rdd => - persistCollectedBatchRecords(timeInfo, recordExport, rdd, persistFactory) + persistCollectedBatchRecords(recordExport, rdd, persistFactory) } } case TimestampMode => { @@ -154,10 +154,10 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { ret } - private def persistCollectedBatchRecords(timeInfo: TimeInfo, recordExport: RecordExport, + private def persistCollectedBatchRecords(recordExport: RecordExport, records: RDD[String], persistFactory: PersistFactory ): Unit = { - val persist = persistFactory.getPersists(timeInfo.calcTime) + val persist = persistFactory.getPersists(recordExport.defTimestamp) persist.persistRecords(records, recordExport.name) } @@ -282,10 +282,10 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { // engine.collectUpdateCacheDatas(ruleStep, timeGroups) // }.headOption // } - def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport + def collectMetrics(metricExport: MetricExport ): Map[Long, Map[String, Any]] = { val ret = engines.foldLeft(Map[Long, Map[String, Any]]()) { (ret, engine) => - if (ret.nonEmpty) ret else engine.collectMetrics(timeInfo, metricExport) + if (ret.nonEmpty) ret else engine.collectMetrics(metricExport) } ret } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/06f969fe/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala index 3bcecdb..736ce56 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala @@ -68,8 +68,7 @@ trait SparkDqEngine extends DqEngine { } } - def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport - ): Map[Long, Map[String, Any]] = { + def collectMetrics(metricExport: MetricExport): Map[Long, Map[String, Any]] = { if (collectable) { val MetricExport(name, stepName, collectType, defTmst, mode) = metricExport try {
