Repository: incubator-griffin
Updated Branches:
  refs/heads/master a92697225 -> d70751710


fix tmst range bug in streaming mode from [a, b) to (a, b]

Author: Lionel Liu <bhlx3l...@163.com>

Closes #209 from bhlx3lyx7/tmst.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/d7075171
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/d7075171
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/d7075171

Branch: refs/heads/master
Commit: d70751710602ec6ffec0790a3b45fda3d345bc02
Parents: a926972
Author: Lionel Liu <bhlx3l...@163.com>
Authored: Mon Feb 5 12:36:15 2018 +0800
Committer: Lionel Liu <bhlx3l...@163.com>
Committed: Mon Feb 5 12:36:15 2018 +0800

----------------------------------------------------------------------
 .../data/source/cache/DataSourceCache.scala     | 35 +++++++++++++-------
 .../measure/process/StreamingDqThread.scala     |  2 +-
 .../rule/trans/DistinctnessRulePlanTrans.scala  |  5 +--
 3 files changed, 25 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/d7075171/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
index 419b141..d61f294 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
@@ -40,12 +40,14 @@ trait DataSourceCache extends DataCacheable with Loggable 
with Serializable {
   val index: Int
 
   var tmstCache: TmstCache = _
-  protected def rangeTmsts(from: Long, until: Long) = tmstCache.range(from, 
until)
+  protected def fromUntilRangeTmsts(from: Long, until: Long) = 
tmstCache.range(from, until)
   protected def clearTmst(t: Long) = tmstCache.remove(t)
   protected def clearTmstsUntil(until: Long) = {
     val outDateTmsts = tmstCache.until(until)
     tmstCache.remove(outDateTmsts)
   }
+  protected def afterTilRangeTmsts(after: Long, til: Long) = 
fromUntilRangeTmsts(after + 1, til + 1)
+  protected def clearTmstsTil(til: Long) = clearTmstsUntil(til + 1)
 
   val _FilePath = "file.path"
   val _InfoPath = "info.path"
@@ -127,13 +129,18 @@ trait DataSourceCache extends DataCacheable with Loggable 
with Serializable {
 
   // read new cache data and old cache data
   def readData(): (Option[DataFrame], TimeRange) = {
-    // time range: [a, b)
+    // time range: (a, b]
     val timeRange = TimeInfoCache.getTimeRange
-    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + 
deltaTimeRange._2 + 1)
+    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + 
deltaTimeRange._2)
 
     // read partition info
-    val filterStr = s"`${InternalColumns.tmst}` >= ${reviseTimeRange._1} AND 
`${InternalColumns.tmst}` < ${reviseTimeRange._2}"
-    println(s"read time range: [${reviseTimeRange._1}, ${reviseTimeRange._2})")
+    val filterStr = if (reviseTimeRange._1 == reviseTimeRange._2) {
+      println(s"read time range: [${reviseTimeRange._1}]")
+      s"`${InternalColumns.tmst}` = ${reviseTimeRange._1}"
+    } else {
+      println(s"read time range: (${reviseTimeRange._1}, 
${reviseTimeRange._2}]")
+      s"`${InternalColumns.tmst}` > ${reviseTimeRange._1} AND 
`${InternalColumns.tmst}` <= ${reviseTimeRange._2}"
+    }
 
     // new cache data
     val newDfOpt = try {
@@ -167,7 +174,7 @@ trait DataSourceCache extends DataCacheable with Loggable 
with Serializable {
 
     // from until tmst range
     val (from, until) = (reviseTimeRange._1, reviseTimeRange._2)
-    val tmstSet = rangeTmsts(from, until)
+    val tmstSet = afterTilRangeTmsts(from, until)
 
     val retTimeRange = TimeRange(reviseTimeRange, tmstSet)
     (cacheDfOpt, retTimeRange)
@@ -184,14 +191,14 @@ trait DataSourceCache extends DataCacheable with Loggable 
with Serializable {
   }
 
   private def cleanOutTimePartitions(path: String, outTime: Long, 
partitionOpt: Option[String]): Unit = {
-    val earlierPaths = listEarlierPartitions(path: String, outTime, 
partitionOpt)
+    val earlierOrEqPaths = listEarlierOrEqPartitions(path: String, outTime, 
partitionOpt)
     // delete out time data path
-    earlierPaths.foreach { path =>
+    earlierOrEqPaths.foreach { path =>
       println(s"delete hdfs path: ${path}")
       HdfsUtil.deleteHdfsPath(path)
     }
   }
-  private def listEarlierPartitions(path: String, bound: Long, partitionOpt: 
Option[String]): Iterable[String] = {
+  private def listEarlierOrEqPartitions(path: String, bound: Long, 
partitionOpt: Option[String]): Iterable[String] = {
     val names = HdfsUtil.listSubPathsByType(path, "dir")
     val regex = partitionOpt match {
       case Some(partition) => s"^${partition}=(\\d+)$$".r
@@ -201,7 +208,7 @@ trait DataSourceCache extends DataCacheable with Loggable 
with Serializable {
       name match {
         case regex(value) => {
           str2Long(value) match {
-            case Some(t) => (t < bound)
+            case Some(t) => (t <= bound)
             case _ => false
           }
         }
@@ -219,6 +226,10 @@ trait DataSourceCache extends DataCacheable with Loggable 
with Serializable {
 
   // clean out time from new cache data and old cache data
   def cleanOutTimeData(): Unit = {
+    // clean tmst
+    val cleanTime = readCleanTime
+    cleanTime.foreach(clearTmstsTil(_))
+
     if (!readOnly) {
       // new cache data
       val newCacheCleanTime = if (updatable) readLastProcTime else 
readCleanTime
@@ -287,7 +298,7 @@ trait DataSourceCache extends DataCacheable with Loggable 
with Serializable {
               val cleanTime = readCleanTime
               val updateDf = cleanTime match {
                 case Some(ct) => {
-                  val filterStr = s"`${InternalColumns.tmst}` >= ${ct}"
+                  val filterStr = s"`${InternalColumns.tmst}` > ${ct}"
                   df.filter(filterStr)
                 }
                 case _ => df
@@ -326,7 +337,7 @@ trait DataSourceCache extends DataCacheable with Loggable 
with Serializable {
     submitLastProcTime(timeRange._2)
 
     // next clean time
-    val nextCleanTime = timeRange._2 + deltaTimeRange._1 + 1
+    val nextCleanTime = timeRange._2 + deltaTimeRange._1
     submitCleanTime(nextCleanTime)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/d7075171/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
index dc49df0..f67724f 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
@@ -138,7 +138,7 @@ case class StreamingDqThread(sqlContext: SQLContext,
   private def printTimeRanges(timeRanges: Map[String, TimeRange]): Unit = {
     val timeRangesStr = timeRanges.map { pair =>
       val (name, timeRange) = pair
-      s"${name} -> [${timeRange.begin}, ${timeRange.end})"
+      s"${name} -> (${timeRange.begin}, ${timeRange.end}]"
     }.mkString(", ")
     println(s"data source timeRanges: ${timeRangesStr}")
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/d7075171/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
index 40a8102..f45911f 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
@@ -61,9 +61,6 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
 
     val ct = timeInfo.calcTime
 
-    val sourceTimeRange = dsTimeRanges.get(sourceName).getOrElse(TimeRange(ct))
-    val beginTime = sourceTimeRange.begin
-
     val beginTmstOpt = dsTimeRanges.get(sourceName).flatMap(_.beginTmstOpt)
     val beginTmst = beginTmstOpt match {
       case Some(t) => t
@@ -126,7 +123,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
           // 4. older alias
           val olderAliasTableName = "__older"
           val olderAliasSql = {
-            s"SELECT ${selClause} FROM `${targetName}` WHERE 
`${InternalColumns.tmst}` < ${beginTime}"
+            s"SELECT ${selClause} FROM `${targetName}` WHERE 
`${InternalColumns.tmst}` < ${beginTmst}"
           }
           val olderAliasStep = SparkSqlStep(olderAliasTableName, 
olderAliasSql, emptyMap)
 

Reply via email to