Repository: incubator-griffin Updated Branches: refs/heads/master 5507f5754 -> 168ebdfe9
fix distinctness bug of source begin tmst Author: Lionel Liu <[email protected]> Closes #225 from bhlx3lyx7/tmst. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/168ebdfe Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/168ebdfe Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/168ebdfe Branch: refs/heads/master Commit: 168ebdfe9e86cd4c1820f3e7613d5a73a8c9f18e Parents: 5507f57 Author: Lionel Liu <[email protected]> Authored: Tue Feb 27 16:14:37 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Tue Feb 27 16:14:37 2018 +0800 ---------------------------------------------------------------------- .../docker/svc_msr/docker-compose-batch.yml | 2 ++ .../docker/svc_msr/docker-compose-streaming.yml | 2 ++ .../rule/trans/DistinctnessRulePlanTrans.scala | 22 +++++++++----------- 3 files changed, 14 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/168ebdfe/griffin-doc/docker/svc_msr/docker-compose-batch.yml ---------------------------------------------------------------------- diff --git a/griffin-doc/docker/svc_msr/docker-compose-batch.yml b/griffin-doc/docker/svc_msr/docker-compose-batch.yml index fb14072..6c1cd49 100644 --- a/griffin-doc/docker/svc_msr/docker-compose-batch.yml +++ b/griffin-doc/docker/svc_msr/docker-compose-batch.yml @@ -22,6 +22,8 @@ griffin: - es environment: ES_HOSTNAME: es + volumes: + - /var/lib/mysql ports: - 32122:2122 - 38088:8088 http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/168ebdfe/griffin-doc/docker/svc_msr/docker-compose-streaming.yml ---------------------------------------------------------------------- diff --git a/griffin-doc/docker/svc_msr/docker-compose-streaming.yml b/griffin-doc/docker/svc_msr/docker-compose-streaming.yml index 22110ee..3c5280f 100644 --- a/griffin-doc/docker/svc_msr/docker-compose-streaming.yml +++ b/griffin-doc/docker/svc_msr/docker-compose-streaming.yml @@ -26,6 +26,8 @@ griffin: ES_HOSTNAME: es ZK_HOSTNAME: zk KAFKA_HOSTNAME: kafka + volumes: + - /var/lib/mysql ports: - 32122:2122 - 38088:8088 http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/168ebdfe/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala index 7820d0c..1ec970b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala @@ -62,16 +62,14 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], val ct = timeInfo.calcTime -// val beginTmstOpt = dsTimeRanges.get(sourceName).flatMap(_.beginTmstOpt) -// val beginTmst = beginTmstOpt match { -// case Some(t) => t -// case _ => throw new Exception(s"empty begin tmst from ${sourceName}") -// } - val beginTmstOpt = dsTimeRanges.get(sourceName).map(_.end) - val beginTmst = beginTmstOpt match { + val beginTmst = dsTimeRanges.get(sourceName).map(_.begin) match { case Some(t) => t case _ => throw new Exception(s"empty begin tmst from ${sourceName}") } + val endTmst = dsTimeRanges.get(sourceName).map(_.end) match { + case Some(t) => t + case _ => throw new Exception(s"empty end tmst from ${sourceName}") + } if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { println(s"[${ct}] data source ${sourceName} not exists") @@ -104,7 +102,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], } val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap) val totalMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) - val totalMetricExport = genMetricExport(totalMetricParam, totalColName, totalTableName, beginTmst, mode) + val totalMetricExport = genMetricExport(totalMetricParam, totalColName, totalTableName, endTmst, mode) // 3. group by self val selfGroupTableName = "__selfGroup" @@ -137,7 +135,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], // 4. older alias val olderAliasTableName = "__older" val olderAliasSql = { - s"SELECT ${selClause} FROM `${targetName}` WHERE `${InternalColumns.tmst}` < ${beginTmst}" + s"SELECT ${selClause} FROM `${targetName}` WHERE `${InternalColumns.tmst}` <= ${beginTmst}" } val olderAliasStep = SparkSqlStep(olderAliasTableName, olderAliasSql, emptyMap) @@ -211,7 +209,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], } val distStep = SparkSqlStep(distTableName, distSql, emptyMap) val distMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) - val distMetricExport = genMetricExport(distMetricParam, distColName, distTableName, beginTmst, mode) + val distMetricExport = genMetricExport(distMetricParam, distColName, distTableName, endTmst, mode) val distMetricRulePlan = RulePlan(distStep :: Nil, distMetricExport :: Nil) @@ -231,7 +229,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], } val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true) val dupRecordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - val dupRecordExport = genRecordExport(dupRecordParam, dupRecordTableName, dupRecordTableName, beginTmst, mode) + val dupRecordExport = genRecordExport(dupRecordParam, dupRecordTableName, dupRecordTableName, endTmst, mode) // 10. duplicate metric val dupMetricTableName = "__dupMetric" @@ -244,7 +242,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], } val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap) val dupMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) - val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName, beginTmst, mode) + val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName, endTmst, mode) RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: dupMetricExport :: Nil) } else emptyRulePlan
