Repository: incubator-griffin Updated Branches: refs/heads/master a92697225 -> d70751710
fix tmst range bug in streaming mode from [a, b) to (a, b] Author: Lionel Liu <[email protected]> Closes #209 from bhlx3lyx7/tmst. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/d7075171 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/d7075171 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/d7075171 Branch: refs/heads/master Commit: d70751710602ec6ffec0790a3b45fda3d345bc02 Parents: a926972 Author: Lionel Liu <[email protected]> Authored: Mon Feb 5 12:36:15 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Mon Feb 5 12:36:15 2018 +0800 ---------------------------------------------------------------------- .../data/source/cache/DataSourceCache.scala | 35 +++++++++++++------- .../measure/process/StreamingDqThread.scala | 2 +- .../rule/trans/DistinctnessRulePlanTrans.scala | 5 +-- 3 files changed, 25 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/d7075171/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala index 419b141..d61f294 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala @@ -40,12 +40,14 @@ trait DataSourceCache extends DataCacheable with Loggable with Serializable { val index: Int var tmstCache: TmstCache = _ - protected def rangeTmsts(from: Long, until: Long) = tmstCache.range(from, until) + protected def fromUntilRangeTmsts(from: Long, until: Long) = tmstCache.range(from, until) protected def clearTmst(t: Long) = tmstCache.remove(t) protected def clearTmstsUntil(until: Long) = { val outDateTmsts = tmstCache.until(until) tmstCache.remove(outDateTmsts) } + protected def afterTilRangeTmsts(after: Long, til: Long) = fromUntilRangeTmsts(after + 1, til + 1) + protected def clearTmstsTil(til: Long) = clearTmstsUntil(til + 1) val _FilePath = "file.path" val _InfoPath = "info.path" @@ -127,13 +129,18 @@ trait DataSourceCache extends DataCacheable with Loggable with Serializable { // read new cache data and old cache data def readData(): (Option[DataFrame], TimeRange) = { - // time range: [a, b) + // time range: (a, b] val timeRange = TimeInfoCache.getTimeRange - val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2 + 1) + val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2) // read partition info - val filterStr = s"`${InternalColumns.tmst}` >= ${reviseTimeRange._1} AND `${InternalColumns.tmst}` < ${reviseTimeRange._2}" - println(s"read time range: [${reviseTimeRange._1}, ${reviseTimeRange._2})") + val filterStr = if (reviseTimeRange._1 == reviseTimeRange._2) { + println(s"read time range: [${reviseTimeRange._1}]") + s"`${InternalColumns.tmst}` = ${reviseTimeRange._1}" + } else { + println(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]") + s"`${InternalColumns.tmst}` > ${reviseTimeRange._1} AND `${InternalColumns.tmst}` <= ${reviseTimeRange._2}" + } // new cache data val newDfOpt = try { @@ -167,7 +174,7 @@ trait DataSourceCache extends DataCacheable with Loggable with Serializable { // from until tmst range val (from, until) = (reviseTimeRange._1, reviseTimeRange._2) - val tmstSet = rangeTmsts(from, until) + val tmstSet = afterTilRangeTmsts(from, until) val retTimeRange = TimeRange(reviseTimeRange, tmstSet) (cacheDfOpt, retTimeRange) @@ -184,14 +191,14 @@ trait DataSourceCache extends DataCacheable with Loggable with Serializable { } private def cleanOutTimePartitions(path: String, outTime: Long, partitionOpt: Option[String]): Unit = { - val earlierPaths = listEarlierPartitions(path: String, outTime, partitionOpt) + val earlierOrEqPaths = listEarlierOrEqPartitions(path: String, outTime, partitionOpt) // delete out time data path - earlierPaths.foreach { path => + earlierOrEqPaths.foreach { path => println(s"delete hdfs path: ${path}") HdfsUtil.deleteHdfsPath(path) } } - private def listEarlierPartitions(path: String, bound: Long, partitionOpt: Option[String]): Iterable[String] = { + private def listEarlierOrEqPartitions(path: String, bound: Long, partitionOpt: Option[String]): Iterable[String] = { val names = HdfsUtil.listSubPathsByType(path, "dir") val regex = partitionOpt match { case Some(partition) => s"^${partition}=(\\d+)$$".r @@ -201,7 +208,7 @@ trait DataSourceCache extends DataCacheable with Loggable with Serializable { name match { case regex(value) => { str2Long(value) match { - case Some(t) => (t < bound) + case Some(t) => (t <= bound) case _ => false } } @@ -219,6 +226,10 @@ trait DataSourceCache extends DataCacheable with Loggable with Serializable { // clean out time from new cache data and old cache data def cleanOutTimeData(): Unit = { + // clean tmst + val cleanTime = readCleanTime + cleanTime.foreach(clearTmstsTil(_)) + if (!readOnly) { // new cache data val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime @@ -287,7 +298,7 @@ trait DataSourceCache extends DataCacheable with Loggable with Serializable { val cleanTime = readCleanTime val updateDf = cleanTime match { case Some(ct) => { - val filterStr = s"`${InternalColumns.tmst}` >= ${ct}" + val filterStr = s"`${InternalColumns.tmst}` > ${ct}" df.filter(filterStr) } case _ => df @@ -326,7 +337,7 @@ trait DataSourceCache extends DataCacheable with Loggable with Serializable { submitLastProcTime(timeRange._2) // next clean time - val nextCleanTime = timeRange._2 + deltaTimeRange._1 + 1 + val nextCleanTime = timeRange._2 + deltaTimeRange._1 submitCleanTime(nextCleanTime) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/d7075171/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala index dc49df0..f67724f 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala @@ -138,7 +138,7 @@ case class StreamingDqThread(sqlContext: SQLContext, private def printTimeRanges(timeRanges: Map[String, TimeRange]): Unit = { val timeRangesStr = timeRanges.map { pair => val (name, timeRange) = pair - s"${name} -> [${timeRange.begin}, ${timeRange.end})" + s"${name} -> (${timeRange.begin}, ${timeRange.end}]" }.mkString(", ") println(s"data source timeRanges: ${timeRangesStr}") } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/d7075171/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala index 40a8102..f45911f 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala @@ -61,9 +61,6 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], val ct = timeInfo.calcTime - val sourceTimeRange = dsTimeRanges.get(sourceName).getOrElse(TimeRange(ct)) - val beginTime = sourceTimeRange.begin - val beginTmstOpt = dsTimeRanges.get(sourceName).flatMap(_.beginTmstOpt) val beginTmst = beginTmstOpt match { case Some(t) => t @@ -126,7 +123,7 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], // 4. older alias val olderAliasTableName = "__older" val olderAliasSql = { - s"SELECT ${selClause} FROM `${targetName}` WHERE `${InternalColumns.tmst}` < ${beginTime}" + s"SELECT ${selClause} FROM `${targetName}` WHERE `${InternalColumns.tmst}` < ${beginTmst}" } val olderAliasStep = SparkSqlStep(olderAliasTableName, olderAliasSql, emptyMap)
