Repository: incubator-griffin
Updated Branches:
  refs/heads/master 5507f5754 -> 168ebdfe9


fix distinctness bug of source begin tmst

Author: Lionel Liu <[email protected]>

Closes #225 from bhlx3lyx7/tmst.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/168ebdfe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/168ebdfe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/168ebdfe

Branch: refs/heads/master
Commit: 168ebdfe9e86cd4c1820f3e7613d5a73a8c9f18e
Parents: 5507f57
Author: Lionel Liu <[email protected]>
Authored: Tue Feb 27 16:14:37 2018 +0800
Committer: Lionel Liu <[email protected]>
Committed: Tue Feb 27 16:14:37 2018 +0800

----------------------------------------------------------------------
 .../docker/svc_msr/docker-compose-batch.yml     |  2 ++
 .../docker/svc_msr/docker-compose-streaming.yml |  2 ++
 .../rule/trans/DistinctnessRulePlanTrans.scala  | 22 +++++++++-----------
 3 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/168ebdfe/griffin-doc/docker/svc_msr/docker-compose-batch.yml
----------------------------------------------------------------------
diff --git a/griffin-doc/docker/svc_msr/docker-compose-batch.yml 
b/griffin-doc/docker/svc_msr/docker-compose-batch.yml
index fb14072..6c1cd49 100644
--- a/griffin-doc/docker/svc_msr/docker-compose-batch.yml
+++ b/griffin-doc/docker/svc_msr/docker-compose-batch.yml
@@ -22,6 +22,8 @@ griffin:
     - es
   environment:
     ES_HOSTNAME: es
+  volumes:
+    - /var/lib/mysql
   ports:
     - 32122:2122
     - 38088:8088

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/168ebdfe/griffin-doc/docker/svc_msr/docker-compose-streaming.yml
----------------------------------------------------------------------
diff --git a/griffin-doc/docker/svc_msr/docker-compose-streaming.yml 
b/griffin-doc/docker/svc_msr/docker-compose-streaming.yml
index 22110ee..3c5280f 100644
--- a/griffin-doc/docker/svc_msr/docker-compose-streaming.yml
+++ b/griffin-doc/docker/svc_msr/docker-compose-streaming.yml
@@ -26,6 +26,8 @@ griffin:
     ES_HOSTNAME: es
     ZK_HOSTNAME: zk
     KAFKA_HOSTNAME: kafka
+  volumes:
+    - /var/lib/mysql
   ports:
     - 32122:2122
     - 38088:8088

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/168ebdfe/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
index 7820d0c..1ec970b 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
@@ -62,16 +62,14 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
 
     val ct = timeInfo.calcTime
 
-//    val beginTmstOpt = dsTimeRanges.get(sourceName).flatMap(_.beginTmstOpt)
-//    val beginTmst = beginTmstOpt match {
-//      case Some(t) => t
-//      case _ => throw new Exception(s"empty begin tmst from ${sourceName}")
-//    }
-    val beginTmstOpt = dsTimeRanges.get(sourceName).map(_.end)
-    val beginTmst = beginTmstOpt match {
+    val beginTmst = dsTimeRanges.get(sourceName).map(_.begin) match {
       case Some(t) => t
       case _ => throw new Exception(s"empty begin tmst from ${sourceName}")
     }
+    val endTmst = dsTimeRanges.get(sourceName).map(_.end) match {
+      case Some(t) => t
+      case _ => throw new Exception(s"empty end tmst from ${sourceName}")
+    }
 
     if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
       println(s"[${ct}] data source ${sourceName} not exists")
@@ -104,7 +102,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
       }
       val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap)
       val totalMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
-      val totalMetricExport = genMetricExport(totalMetricParam, totalColName, 
totalTableName, beginTmst, mode)
+      val totalMetricExport = genMetricExport(totalMetricParam, totalColName, 
totalTableName, endTmst, mode)
 
       // 3. group by self
       val selfGroupTableName = "__selfGroup"
@@ -137,7 +135,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
           // 4. older alias
           val olderAliasTableName = "__older"
           val olderAliasSql = {
-            s"SELECT ${selClause} FROM `${targetName}` WHERE 
`${InternalColumns.tmst}` < ${beginTmst}"
+            s"SELECT ${selClause} FROM `${targetName}` WHERE 
`${InternalColumns.tmst}` <= ${beginTmst}"
           }
           val olderAliasStep = SparkSqlStep(olderAliasTableName, 
olderAliasSql, emptyMap)
 
@@ -211,7 +209,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
       }
       val distStep = SparkSqlStep(distTableName, distSql, emptyMap)
       val distMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
-      val distMetricExport = genMetricExport(distMetricParam, distColName, 
distTableName, beginTmst, mode)
+      val distMetricExport = genMetricExport(distMetricParam, distColName, 
distTableName, endTmst, mode)
 
       val distMetricRulePlan = RulePlan(distStep :: Nil, distMetricExport :: 
Nil)
 
@@ -231,7 +229,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
         }
         val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, 
emptyMap, true)
         val dupRecordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-        val dupRecordExport = genRecordExport(dupRecordParam, 
dupRecordTableName, dupRecordTableName, beginTmst, mode)
+        val dupRecordExport = genRecordExport(dupRecordParam, 
dupRecordTableName, dupRecordTableName, endTmst, mode)
 
         // 10. duplicate metric
         val dupMetricTableName = "__dupMetric"
@@ -244,7 +242,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
         }
         val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, 
emptyMap)
         val dupMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
-        val dupMetricExport = genMetricExport(dupMetricParam, 
duplicationArrayName, dupMetricTableName, beginTmst, mode)
+        val dupMetricExport = genMetricExport(dupMetricParam, 
duplicationArrayName, dupMetricTableName, endTmst, mode)
 
         RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: 
dupMetricExport :: Nil)
       } else emptyRulePlan

Reply via email to