Repository: incubator-griffin Updated Branches: refs/heads/master 19f68c320 -> 60c022176
update as clause and add timestamp in configure Author: Lionel Liu <[email protected]> Closes #170 from bhlx3lyx7/docker. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/60c02217 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/60c02217 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/60c02217 Branch: refs/heads/master Commit: 60c02217625fb6e12bfc0ee01328d39b0908828d Parents: 19f68c3 Author: Lionel Liu <[email protected]> Authored: Mon Nov 6 15:53:29 2017 +0800 Committer: Lionel Liu <[email protected]> Committed: Mon Nov 6 15:53:29 2017 +0800 ---------------------------------------------------------------------- .../measure/config/params/user/UserParam.scala | 1 + .../griffin/measure/process/BatchDqProcess.scala | 2 +- .../apache/griffin/measure/process/DqProcess.scala | 5 +++++ .../griffin/measure/process/StreamingDqProcess.scala | 2 +- .../griffin/measure/rule/dsl/expr/FunctionExpr.scala | 6 +++++- .../griffin/measure/rule/dsl/expr/SelectExpr.scala | 14 +++++++------- measure/src/test/resources/config-test-accuracy.json | 2 ++ .../measure/rule/adaptor/GriffinDslAdaptorTest.scala | 2 +- .../measure/rule/dsl/parser/BasicParserTest.scala | 2 +- .../apache/griffin/measure/utils/HdfsUtilTest.scala | 2 +- 10 files changed, 25 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/60c02217/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala index e55d2b4..173f8f4 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala @@ -24,6 +24,7 @@ import org.apache.griffin.measure.config.params.Param @JsonInclude(Include.NON_NULL) case class UserParam( @JsonProperty("name") name: String, + @JsonProperty("timestamp") timestamp: Long, @JsonProperty("process.type") procType: String, @JsonProperty("data.sources") dataSources: List[DataSourceParam], @JsonProperty("evaluateRule") evaluateRuleParam: EvaluateRuleParam http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/60c02217/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala index 737a43f..dc8b79a 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala @@ -65,7 +65,7 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess { def run: Try[_] = Try { // start time - val startTime = new Date().getTime() + val startTime = getStartTime // get persists to persist measure result val persistFactory = PersistFactory(envParam.persistParams, metricName) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/60c02217/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala index 50b04a8..7ff29d6 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala @@ -37,4 +37,9 @@ trait DqProcess extends Loggable with Serializable { def retriable: Boolean + protected def getStartTime: Long = { + if (userParam.timestamp != null && userParam.timestamp > 0) { userParam.timestamp } + else { System.currentTimeMillis } + } + } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/60c02217/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala index a567941..3fe8b3f 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala @@ -82,7 +82,7 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess { }) // start time - val startTime = new Date().getTime() + val startTime = getStartTime // get persists to persist measure result val persistFactory = PersistFactory(envParam.persistParams, metricName) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/60c02217/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/FunctionExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/FunctionExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/FunctionExpr.scala index b82fd96..e33b03d 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/FunctionExpr.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/FunctionExpr.scala @@ -25,5 +25,9 @@ case class FunctionExpr(functionName: String, args: Seq[Expr], aliasOpt: Option[ def desc: String = s"${functionName}(${args.map(_.desc).mkString(", ")})" def coalesceDesc: String = desc - def alias: Option[String] = if (aliasOpt.isEmpty) Some(functionName) else aliasOpt + def alias: Option[String] = { + if (aliasOpt.isEmpty) { + Some(functionName) + } else aliasOpt + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/60c02217/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala index fd803a8..d1cc86e 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala @@ -18,8 +18,8 @@ under the License. */ package org.apache.griffin.measure.rule.dsl.expr -trait HeadExpr extends Expr { - +trait HeadExpr extends Expr with AliasableExpr { + def alias: Option[String] = None } case class DataSourceHeadExpr(name: String) extends HeadExpr { @@ -30,6 +30,7 @@ case class DataSourceHeadExpr(name: String) extends HeadExpr { case class FieldNameHeadExpr(field: String) extends HeadExpr { def desc: String = field def coalesceDesc: String = desc + override def alias: Option[String] = Some(field) } case class ALLSelectHeadExpr() extends HeadExpr { @@ -43,6 +44,7 @@ case class OtherHeadExpr(expr: Expr) extends HeadExpr { def desc: String = expr.desc def coalesceDesc: String = expr.coalesceDesc + override def alias: Option[String] = Some(expr.desc) } // ------------- @@ -68,7 +70,7 @@ case class IndexSelectExpr(index: Expr) extends SelectExpr { def desc: String = s"[${index.desc}]" def coalesceDesc: String = desc - def alias: Option[String] = Some(desc) + def alias: Option[String] = Some(index.desc) } case class FunctionSelectExpr(functionName: String, args: Seq[Expr]) extends SelectExpr { @@ -106,10 +108,8 @@ case class SelectionExpr(head: HeadExpr, selectors: Seq[SelectExpr], aliasOpt: O } def alias: Option[String] = { if (aliasOpt.isEmpty) { - selectors.lastOption match { - case Some(last) => last.alias - case _ => None - } + val aliasSeq = (head +: selectors).flatMap(_.alias) + if (aliasSeq.size > 0) Some(aliasSeq.mkString("_")) else None } else aliasOpt } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/60c02217/measure/src/test/resources/config-test-accuracy.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config-test-accuracy.json b/measure/src/test/resources/config-test-accuracy.json index ecbdaaa..7f637a0 100644 --- a/measure/src/test/resources/config-test-accuracy.json +++ b/measure/src/test/resources/config-test-accuracy.json @@ -1,6 +1,8 @@ { "name": "accu_batch_test", + "timestamp": 12124214, + "process.type": "batch", "data.sources": [ http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/60c02217/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala index 987a060..8097964 100644 --- a/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala +++ b/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala @@ -36,7 +36,7 @@ class GriffinDslAdaptorTest extends FunSuite with Matchers with BeforeAndAfter w |{ | "dsl.type": "griffin-dsl", | "dq.type": "profiling", - | "rule": "source.age, (source.user_id.COUNT() + 1s) as cnt group by source.age having source.desc.count() > 5 or false order by user_id desc, user_name asc limit 5", + | "rule": "source.age, source.age.count(), (source.user_id.COUNT() + 1s) as cnt group by source.age having source.desc.count() > 5 or false order by user_id desc, user_name asc limit 5", | "details": { | "source": "source", | "profiling": { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/60c02217/measure/src/test/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParserTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParserTest.scala b/measure/src/test/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParserTest.scala index a1b9a83..5f13af7 100644 --- a/measure/src/test/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParserTest.scala +++ b/measure/src/test/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParserTest.scala @@ -90,7 +90,7 @@ class BasicParserTest extends FunSuite with Matchers with BeforeAndAfter { val result3 = parser.parseAll(parser.selection, rule3) result3.successful should be (true) result3.get.desc should be ("source[12].age") - result3.get.alias should be (Some("age")) + result3.get.alias should be (Some("12_age")) val rule4 = """source.name.func(target.name)""" val result4 = parser.parseAll(parser.selection, rule4) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/60c02217/measure/src/test/scala/org/apache/griffin/measure/utils/HdfsUtilTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/utils/HdfsUtilTest.scala b/measure/src/test/scala/org/apache/griffin/measure/utils/HdfsUtilTest.scala index 6a672d5..5b94901 100644 --- a/measure/src/test/scala/org/apache/griffin/measure/utils/HdfsUtilTest.scala +++ b/measure/src/test/scala/org/apache/griffin/measure/utils/HdfsUtilTest.scala @@ -29,7 +29,7 @@ import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} -@RunWith(classOf[JUnitRunner]) +//@RunWith(classOf[JUnitRunner]) class HdfsUtilTest extends FunSuite with Matchers with BeforeAndAfter { private val seprator = "/"
