http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/env/CleanerParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/CleanerParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/CleanerParam.scala deleted file mode 100644 index 00302a7..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/CleanerParam.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.config.params.env - -import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} -import com.fasterxml.jackson.annotation.JsonInclude.Include -import org.apache.griffin.measure.config.params.Param - -@JsonInclude(Include.NON_NULL) -case class CleanerParam( @JsonProperty("clean.interval") cleanInterval: String - ) extends Param { - -}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EmailParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EmailParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EmailParam.scala deleted file mode 100644 index f5f8a4c..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EmailParam.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package org.apache.griffin.measure.config.params.env - -import com.fasterxml.jackson.annotation.JsonInclude.Include -import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} -import org.apache.griffin.measure.config.params.Param - -/** - * Created by xiaoqiu.duan on 2017/10/23. - */ -@JsonInclude(Include.NON_NULL) -case class EmailParam(@JsonProperty("host") host: String, - @JsonProperty("mail") mail: String, - @JsonProperty("user") usr: String, - @JsonProperty("password") pwd: String - ) extends Param { - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EnvParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EnvParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EnvParam.scala deleted file mode 100644 index 629d06a..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EnvParam.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.config.params.env - -import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} -import com.fasterxml.jackson.annotation.JsonInclude.Include -import org.apache.griffin.measure.config.params.Param - -@JsonInclude(Include.NON_NULL) -case class EnvParam(@JsonProperty("spark") sparkParam: SparkParam, - @JsonProperty("persist") persistParams: List[PersistParam], - @JsonProperty("info.cache") infoCacheParams: List[InfoCacheParam], - @JsonProperty("mail") emailParam: EmailParam, - @JsonProperty("sms") smsParam: SMSParam, - @JsonProperty("cleaner") cleanerParam: CleanerParam - ) extends Param { - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/env/InfoCacheParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/InfoCacheParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/InfoCacheParam.scala deleted file mode 100644 index be588f9..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/InfoCacheParam.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.config.params.env - -import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} -import com.fasterxml.jackson.annotation.JsonInclude.Include -import org.apache.griffin.measure.config.params.Param - -@JsonInclude(Include.NON_NULL) -case class InfoCacheParam( @JsonProperty("type") persistType: String, - @JsonProperty("config") config: Map[String, Any] - ) extends Param { - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/env/PersistParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/PersistParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/PersistParam.scala deleted file mode 100644 index 68b9bc8..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/PersistParam.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.config.params.env - -import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} -import com.fasterxml.jackson.annotation.JsonInclude.Include -import org.apache.griffin.measure.config.params.Param - -@JsonInclude(Include.NON_NULL) -case class PersistParam( @JsonProperty("type") persistType: String, - @JsonProperty("config") config: Map[String, Any] - ) extends Param { - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SMSParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SMSParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SMSParam.scala deleted file mode 100644 index 0cc87a7..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SMSParam.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package org.apache.griffin.measure.config.params.env - -import com.fasterxml.jackson.annotation.JsonInclude.Include -import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} -import org.apache.griffin.measure.config.params.Param - -/** - * Created by xiaoqiu.duan on 2017/10/23. - */ -@JsonInclude(Include.NON_NULL) -case class SMSParam(@JsonProperty("host") host: String, - @JsonProperty("id") id: String, - @JsonProperty("key") key: String, - @JsonProperty("UUID") uuid: String - ) extends Param { - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala deleted file mode 100644 index a21a64f..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.config.params.env - -import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} -import com.fasterxml.jackson.annotation.JsonInclude.Include -import org.apache.griffin.measure.config.params.Param - -@JsonInclude(Include.NON_NULL) -case class SparkParam( @JsonProperty("log.level") logLevel: String, - @JsonProperty("checkpoint.dir") cpDir: String, - @JsonProperty("batch.interval") batchInterval: String, - @JsonProperty("process.interval") processInterval: String, - @JsonProperty("config") config: Map[String, String], - @JsonProperty("init.clear") initClear: Boolean - ) extends Param { - - def needInitClear: Boolean = if (initClear != null) initClear else false - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala deleted file mode 100644 index a819997..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.config.params.user - -import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} -import com.fasterxml.jackson.annotation.JsonInclude.Include -import org.apache.griffin.measure.config.params.Param - -@JsonInclude(Include.NON_NULL) -case class DataConnectorParam( @JsonProperty("type") conType: String, - @JsonProperty("version") version: String, - @JsonProperty("config") config: Map[String, Any], - @JsonProperty("pre.proc") preProc: List[Map[String, Any]] - ) extends Param { - - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala deleted file mode 100644 index c43ea70..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.config.params.user - -import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} -import com.fasterxml.jackson.annotation.JsonInclude.Include -import org.apache.griffin.measure.config.params.Param - -@JsonInclude(Include.NON_NULL) -case class DataSourceParam( @JsonProperty("name") name: String, - @JsonProperty("baseline") baseline: Boolean, - @JsonProperty("connectors") connectors: List[DataConnectorParam], - @JsonProperty("cache") cache: Map[String, Any] - ) extends Param { - def hasName: Boolean = (name != null) - def isBaseLine: Boolean = if (baseline == null) false else baseline - def falseBaselineClone: DataSourceParam = DataSourceParam(name, false, connectors, cache) - def getConnectors: List[DataConnectorParam] = if (connectors != null) connectors else Nil -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala deleted file mode 100644 index 2abf3e5..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.config.params.user - -import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} -import com.fasterxml.jackson.annotation.JsonInclude.Include -import org.apache.griffin.measure.config.params.Param - -@JsonInclude(Include.NON_NULL) -case class EvaluateRuleParam( @JsonProperty("dsl.type") dslType: String, - @JsonProperty("rules") rules: List[Map[String, Any]] - ) extends Param { - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala deleted file mode 100644 index b0f7b29..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.config.params.user - -import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} -import com.fasterxml.jackson.annotation.JsonInclude.Include -import org.apache.griffin.measure.config.params.Param - -@JsonInclude(Include.NON_NULL) -case class UserParam( @JsonProperty("name") name: String, - @JsonProperty("timestamp") timestamp: Long, - @JsonProperty("process.type") procType: String, - @JsonProperty("data.sources") dataSourceParams: List[DataSourceParam], - @JsonProperty("evaluate.rule") evaluateRuleParam: EvaluateRuleParam - ) extends Param { - - private val validDs = { - val (validDsParams, _) = dataSourceParams.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, dsParam) => - val (seq, names) = ret - if (dsParam.hasName && !names.contains(dsParam.name)) { - (seq :+ dsParam, names + dsParam.name) - } else ret - } - validDsParams - } - private val baselineDsOpt = { - val baselines = validDs.filter(_.isBaseLine) - if (baselines.nonEmpty) baselines.headOption - else validDs.headOption - } - - val baselineDsName = baselineDsOpt match { - case Some(ds) => ds.name - case _ => "" - } - val dataSources = { - validDs.map { ds => - if (ds.name != baselineDsName && ds.isBaseLine) { - ds.falseBaselineClone - } else ds - } - } - - override def validate(): Boolean = { - dataSources.nonEmpty - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamFileReader.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamFileReader.scala b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamFileReader.scala deleted file mode 100644 index ede68f4..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamFileReader.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.config.reader - -import org.apache.griffin.measure.config.params.Param -import org.apache.griffin.measure.utils.JsonUtil - -import scala.util.Try - -case class ParamFileReader(file: String) extends ParamReader { - - def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = { - Try { - val source = scala.io.Source.fromFile(file) - val lines = source.mkString - val param = JsonUtil.fromJson[T](lines) - source.close - param - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamHdfsFileReader.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamHdfsFileReader.scala b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamHdfsFileReader.scala deleted file mode 100644 index 8b51b11..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamHdfsFileReader.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.config.reader - -import org.apache.griffin.measure.config.params.Param -import org.apache.griffin.measure.utils.JsonUtil -import org.apache.griffin.measure.utils.HdfsUtil - -import scala.util.Try - -case class ParamHdfsFileReader(filePath: String) extends ParamReader { - - def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = { - Try { - val source = HdfsUtil.openFile(filePath) - val param = JsonUtil.fromJson[T](source) - source.close - param - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamRawStringReader.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamRawStringReader.scala b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamRawStringReader.scala deleted file mode 100644 index 87e1953..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamRawStringReader.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.config.reader - -import org.apache.griffin.measure.config.params.Param -import org.apache.griffin.measure.utils.JsonUtil - -import scala.util.Try - -case class ParamRawStringReader(rawString: String) extends ParamReader { - - def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = { - Try { - val param = JsonUtil.fromJson[T](rawString) - param - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReader.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReader.scala b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReader.scala deleted file mode 100644 index 3508223..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReader.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.config.reader - -import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.config.params.Param - -import scala.util.Try - -trait ParamReader extends Loggable with Serializable { - - def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReaderFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReaderFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReaderFactory.scala deleted file mode 100644 index 9299247..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReaderFactory.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.config.reader - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} - - -object ParamReaderFactory { - - val RawStringRegex = """^(?i)raw$""".r - val LocalFsRegex = """^(?i)local$""".r - val HdfsFsRegex = """^(?i)hdfs$""".r - - def getParamReader(filePath: String, fsType: String): ParamReader = { - fsType match { - case RawStringRegex() => ParamRawStringReader(filePath) - case LocalFsRegex() => ParamFileReader(filePath) - case HdfsFsRegex() => ParamHdfsFileReader(filePath) - case _ => ParamHdfsFileReader(filePath) - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala b/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala deleted file mode 100644 index fd486e9..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.config.validator - -import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.config.params._ - -import scala.util.Try - -object ParamValidator extends Loggable with Serializable { - - def validate[T <: Param](param: Param): Try[Boolean] = Try { - param.validate - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala new file mode 100644 index 0000000..154510f --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala @@ -0,0 +1,112 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.configuration.enums + +import scala.util.matching.Regex + +/** + * effective when dsl type is "griffin-dsl", + * indicates the dq type of griffin pre-defined measurements + */ +sealed trait DqType { + val regex: Regex + val desc: String +} + +object DqType { + private val dqTypes: List[DqType] = List( + AccuracyType, ProfilingType, UniquenessType, DistinctnessType, TimelinessType, CompletenessType, UnknownType + ) + def apply(ptn: String): DqType = { + dqTypes.find(tp => ptn match { + case tp.regex() => true + case _ => false + }).getOrElse(UnknownType) + } + def unapply(pt: DqType): Option[String] = Some(pt.desc) +} + +/** + * accuracy: the match percentage of items between source and target + * count(source items matched with the ones from target) / count(source) + * e.g.: source [1, 2, 3, 4, 5], target: [1, 2, 3, 4] + * metric will be: { "total": 5, "miss": 1, "matched": 4 } + * accuracy is 80%. + */ +final case object AccuracyType extends DqType { + val regex = "^(?i)accuracy$".r + val desc = "accuracy" +} + +/** + * profiling: the statistic data of data source + * e.g.: max, min, average, group by count, ... + */ +final case object ProfilingType extends DqType { + val regex = "^(?i)profiling$".r + val desc = "profiling" +} + +/** + * uniqueness: the uniqueness of data source comparing with itself + * count(unique items in source) / count(source) + * e.g.: [1, 2, 3, 3] -> { "unique": 2, "total": 4, "dup-arr": [ "dup": 1, "num": 1 ] } + * uniqueness indicates the items without any replica of data + */ +final case object UniquenessType extends DqType { + val regex = "^(?i)uniqueness|duplicate$".r + val desc = "uniqueness" +} + +/** + * distinctness: the distinctness of data source comparing with itself + * count(distinct items in source) / count(source) + * e.g.: [1, 2, 3, 3] -> { "dist": 3, "total": 4, "dup-arr": [ "dup": 1, "num": 1 ] } + * distinctness indicates the valid information of data + * comparing with uniqueness, distinctness is more meaningful + */ +final case object DistinctnessType extends DqType { + val regex = "^(?i)distinct$".r + val desc = "distinct" +} + +/** + * timeliness: the latency of data source with timestamp information + * e.g.: (receive_time - send_time) + * timeliness can get the statistic metric of latency, like average, max, min, percentile-value, + * even more, it can record the items with latency above threshold you configured + */ +final case object TimelinessType extends DqType { + val regex = "^(?i)timeliness$".r + val desc = "timeliness" +} + +/** + * completeness: the completeness of data source + * the columns you measure is incomplete if it is null + */ +final case object CompletenessType extends DqType { + val regex = "^(?i)completeness$".r + val desc = "completeness" +} + +final case object UnknownType extends DqType { + val regex = "".r + val desc = "unknown" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala new file mode 100644 index 0000000..c6b68d3 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala @@ -0,0 +1,64 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.configuration.enums + +import scala.util.matching.Regex + +/** + * dsl type indicates the language type of rule param + */ +sealed trait DslType { + val regex: Regex + val desc: String +} + +object DslType { + private val dslTypes: List[DslType] = List(SparkSqlType, GriffinDslType, DataFrameOpsType) + def apply(ptn: String): DslType = { + dslTypes.find(tp => ptn match { + case tp.regex() => true + case _ => false + }).getOrElse(GriffinDslType) + } + def unapply(pt: DslType): Option[String] = Some(pt.desc) +} + +/** + * spark-sql: rule defined in "SPARK-SQL" directly + */ +final case object SparkSqlType extends DslType { + val regex = "^(?i)spark-?sql$".r + val desc = "spark-sql" +} + +/** + * df-ops: data frame operations rule, support some pre-defined data frame ops + */ +final case object DataFrameOpsType extends DslType { + val regex = "^(?i)df-?(?:op|opr|ops)$".r + val desc = "df-opr" +} + +/** + * griffin-dsl: griffin dsl rule, to define dq measurements easier + */ +final case object GriffinDslType extends DslType { + val regex = "^(?i)griffin-?dsl$".r + val desc = "griffin-dsl" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala new file mode 100644 index 0000000..d8cfcf8 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala @@ -0,0 +1,86 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.configuration.enums + +import scala.util.matching.Regex + +/** + * the normalize strategy to collect metric + */ +sealed trait NormalizeType { + val regex: Regex + val desc: String +} + +object NormalizeType { + private val normalizeTypes: List[NormalizeType] = List(DefaultNormalizeType, EntriesNormalizeType, ArrayNormalizeType, MapNormalizeType) + def apply(ptn: String): NormalizeType = { + normalizeTypes.find(tp => ptn match { + case tp.regex() => true + case _ => false + }).getOrElse(DefaultNormalizeType) + } + def unapply(pt: NormalizeType): Option[String] = Some(pt.desc) +} + +/** + * default normalize strategy + * metrics contains n rows -> normalized metric json map + * n = 0: { } + * n = 1: { "col1": "value1", "col2": "value2", ... } + * n > 1: { "arr-name": [ { "col1": "value1", "col2": "value2", ... }, ... ] } + * all rows + */ +final case object DefaultNormalizeType extends NormalizeType { + val regex: Regex = "".r + val desc: String = "default" +} + +/** + * metrics contains n rows -> normalized metric json map + * n = 0: { } + * n >= 1: { "col1": "value1", "col2": "value2", ... } + * the first row only + */ +final case object EntriesNormalizeType extends NormalizeType { + val regex: Regex = "^(?i)entries$".r + val desc: String = "entries" +} + +/** + * metrics contains n rows -> normalized metric json map + * n = 0: { "arr-name": [ ] } + * n >= 1: { "arr-name": [ { "col1": "value1", "col2": "value2", ... }, ... ] } + * all rows + */ +final case object ArrayNormalizeType extends NormalizeType { + val regex: Regex = "^(?i)array|list$".r + val desc: String = "array" +} + +/** + * metrics contains n rows -> normalized metric json map + * n = 0: { "map-name": { } } + * n >= 1: { "map-name": { "col1": "value1", "col2": "value2", ... } } + * the first row only + */ +final case object MapNormalizeType extends NormalizeType { + val regex: Regex = "^(?i)map$".r + val desc: String = "map" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala new file mode 100644 index 0000000..18a82bf --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala @@ -0,0 +1,56 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.configuration.enums + +import scala.util.matching.Regex + +/** + * process type enum + */ +sealed trait ProcessType { + val regex: Regex + val desc: String +} + +object ProcessType { + private val procTypes: List[ProcessType] = List(BatchProcessType, StreamingProcessType) + def apply(ptn: String): ProcessType = { + procTypes.find(tp => ptn match { + case tp.regex() => true + case _ => false + }).getOrElse(BatchProcessType) + } + def unapply(pt: ProcessType): Option[String] = Some(pt.desc) +} + +/** + * process in batch mode + */ +final case object BatchProcessType extends ProcessType { + val regex = """^(?i)batch$""".r + val desc = "batch" +} + +/** + * process in streaming mode + */ +final case object StreamingProcessType extends ProcessType { + val regex = """^(?i)streaming$""".r + val desc = "streaming" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala new file mode 100644 index 0000000..fd73da3 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala @@ -0,0 +1,43 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.configuration.enums + +/** + * write mode when write metrics and records + */ +sealed trait WriteMode {} + +object WriteMode { + def defaultMode(procType: ProcessType): WriteMode = { + procType match { + case BatchProcessType => SimpleMode + case StreamingProcessType => TimestampMode + } + } +} + +/** + * simple mode: write metrics and records directly + */ +final case object SimpleMode extends WriteMode {} + +/** + * timestamp mode: write metrics and records with timestamp information + */ +final case object TimestampMode extends WriteMode {} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamFileReader.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamFileReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamFileReader.scala new file mode 100644 index 0000000..5091d49 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamFileReader.scala @@ -0,0 +1,43 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.configuration.json + +import org.apache.griffin.measure.configuration.params.Param +import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil} + +import scala.util.Try + +/** + * read params from config file path + * @param filePath: hdfs path ("hdfs://cluster-name/path") + * local file path ("file:///path") + * relative file path ("relative/path") + */ +case class ParamFileReader(filePath: String) extends ParamReader { + + def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = { + Try { + val source = HdfsUtil.openFile(filePath) + val param = JsonUtil.fromJson[T](source) + source.close + param + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamJsonReader.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamJsonReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamJsonReader.scala new file mode 100644 index 0000000..2f4a5e7 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamJsonReader.scala @@ -0,0 +1,39 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.configuration.json + +import org.apache.griffin.measure.configuration.params.Param +import org.apache.griffin.measure.utils.JsonUtil + +import scala.util.Try + +/** + * read params from json string directly + * @param jsonString + */ +case class ParamJsonReader(jsonString: String) extends ParamReader { + + def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = { + Try { + val param = JsonUtil.fromJson[T](jsonString) + param + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamReader.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamReader.scala new file mode 100644 index 0000000..d923cd5 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamReader.scala @@ -0,0 +1,35 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.configuration.json + +import org.apache.griffin.measure.Loggable +import org.apache.griffin.measure.configuration.params.Param + +import scala.util.Try + +trait ParamReader extends Loggable with Serializable { + + /** + * read config param + * @tparam T param type expected + * @return parsed param + */ + def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamReaderFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamReaderFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamReaderFactory.scala new file mode 100644 index 0000000..14fce02 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamReaderFactory.scala @@ -0,0 +1,48 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.configuration.json + +import org.apache.griffin.measure.utils.JsonUtil + +object ParamReaderFactory { + + val json = "json" + val file = "file" + + /** + * parse string content to get param reader + * @param pathOrJson + * @return + */ + def getParamReader(pathOrJson: String): ParamReader = { + val strType = paramStrType(pathOrJson) + if (json.equals(strType)) ParamJsonReader(pathOrJson) + else ParamFileReader(pathOrJson) + } + + private def paramStrType(str: String): String = { + try { + JsonUtil.toAnyMap(str) + json + } catch { + case e: Throwable => file + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala new file mode 100644 index 0000000..4ba1a15 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala @@ -0,0 +1,36 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.configuration.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +/** + * full set of griffin configuration + * @param envParam environment configuration (must) + * @param dqParam dq measurement configuration (must) + */ +@JsonInclude(Include.NON_NULL) +case class AllParam( @JsonProperty("env") envParam: EnvParam, + @JsonProperty("dq") dqParam: DQParam + ) extends Param { + override def validate(): Boolean = { + envParam != null && dqParam != null + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala new file mode 100644 index 0000000..8d3c354 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala @@ -0,0 +1,183 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.configuration.params + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import org.apache.commons.lang.StringUtils +import org.apache.griffin.measure.configuration.enums.{DqType, DslType} + +/** + * dq param + * @param name name of dq measurement (must) + * @param timestamp default timestamp of measure in batch mode (optional) + * @param procType batch mode or streaming mode (must) + * @param dataSourceParams data sources (optional) + * @param evaluateRuleParam dq measurement (optional) + */ +@JsonInclude(Include.NON_NULL) +case class DQParam( @JsonProperty("name") name: String, + @JsonProperty("timestamp") timestamp: Long, + @JsonProperty("process.type") procType: String, + @JsonProperty("data.sources") dataSourceParams: List[DataSourceParam], + @JsonProperty("evaluate.rule") evaluateRuleParam: EvaluateRuleParam + ) extends Param { + val dataSources = { + val (validDsParams, _) = dataSourceParams.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, dsParam) => + val (seq, names) = ret + if (dsParam.hasName && !names.contains(dsParam.name)) { + (seq :+ dsParam, names + dsParam.name) + } else ret + } + validDsParams + } + val evaluateRule: EvaluateRuleParam = { + if (evaluateRuleParam != null) evaluateRuleParam + else EvaluateRuleParam("", Nil) + } + + override def validate(): Boolean = { + dataSources.nonEmpty + } +} + +/** + * data source param + * @param name data source name (must) + * @param connectors data connectors (optional) + * @param cache data source cache configuration (must in streaming mode with streaming connectors) + */ +@JsonInclude(Include.NON_NULL) +case class DataSourceParam( @JsonProperty("name") name: String, + @JsonProperty("connectors") connectors: List[DataConnectorParam], + @JsonProperty("cache") cache: Map[String, Any] + ) extends Param { + def hasName: Boolean = StringUtils.isNotBlank(name) + def getConnectors: List[DataConnectorParam] = if (connectors != null) connectors else Nil + def hasCache: Boolean = (cache != null) + + override def validate(): Boolean = hasName +} + +/** + * data connector param + * @param conType data connector type, e.g.: hive, avro, kafka (must) + * @param version data connector type version (optional) + * @param config detail configuration of data connector (must) + * @param preProc pre-process rules after load data (optional) + */ +@JsonInclude(Include.NON_NULL) +case class DataConnectorParam( @JsonProperty("type") conType: String, + @JsonProperty("version") version: String, + @JsonProperty("config") config: Map[String, Any], + @JsonProperty("pre.proc") preProc: List[RuleParam] + ) extends Param { + override def validate(): Boolean = { + StringUtils.isNotBlank(conType) + } +} + +/** + * evaluate rule param + * @param dslType default dsl type for all rules (optional) + * @param rules rules to define dq measurement (optional) + */ +@JsonInclude(Include.NON_NULL) +case class EvaluateRuleParam( @JsonProperty("dsl.type") dslType: String, + @JsonProperty("rules") rules: List[RuleParam] + ) extends Param { + def getDslType: DslType = if (dslType != null) DslType(dslType) else DslType("") + def getRules: List[RuleParam] = if (rules != null) rules else Nil +} + +/** + * rule param + * @param dslType dsl type of this rule (must if default dsl type not set) + * @param dqType dq type of this rule (valid for "griffin-dsl") + * @param name name of result calculated by this rule (must if for later usage) + * @param rule rule to define dq step calculation (must) + * @param details detail config of rule (optional) + * @param cache cache the result for multiple usage (optional, valid for "spark-sql" and "df-opr" mode) + * @param metric config for metric output (optional) + * @param record config for record output (optional) + * @param dsCacheUpdate config for data source cache update output (optional, valid in streaming mode) + */ +@JsonInclude(Include.NON_NULL) +case class RuleParam( @JsonProperty("dsl.type") dslType: String, + @JsonProperty("dq.type") dqType: String, + @JsonProperty("name") name: String, + @JsonProperty("rule") rule: String, + @JsonProperty("details") details: Map[String, Any], + @JsonProperty("cache") cache: Boolean, + @JsonProperty("metric") metric: RuleMetricParam, + @JsonProperty("record") record: RuleRecordParam, + @JsonProperty("ds.cache.update") dsCacheUpdate: RuleDsCacheUpdateParam + ) extends Param { + def getDslType(defaultDslType: DslType): DslType = if (dslType != null) DslType(dslType) else defaultDslType + def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("") + def getName: String = if (name != null) name else "" + def getRule: String = if (rule != null) rule else "" + def getDetails: Map[String, Any] = if (details != null) details else Map[String, Any]() + def getCache: Boolean = if (cache != null) cache else false + + def metricOpt: Option[RuleMetricParam] = if (metric != null) Some(metric) else None + def recordOpt: Option[RuleRecordParam] = if (record != null) Some(record) else None + def dsCacheUpdateOpt: Option[RuleDsCacheUpdateParam] = if (dsCacheUpdate != null) Some(dsCacheUpdate) else None + + def replaceName(newName: String): RuleParam = { + if (StringUtils.equals(newName, name)) this + else RuleParam(dslType, dqType, newName, rule, details, cache, metric, record, dsCacheUpdate) + } + def replaceRule(newRule: String): RuleParam = { + if (StringUtils.equals(newRule, rule)) this + else RuleParam(dslType, dqType, name, newRule, details, cache, metric, record, dsCacheUpdate) + } + def replaceDetails(newDetails: Map[String, Any]): RuleParam = { + RuleParam(dslType, dqType, name, rule, newDetails, cache, metric, record, dsCacheUpdate) + } +} + +/** + * metric param of rule + * @param name name of metric to output (optional) + * @param collectType the normalize strategy to collect metric (optional) + */ +@JsonInclude(Include.NON_NULL) +case class RuleMetricParam( @JsonProperty("name") name: String, + @JsonProperty("collect.type") collectType: String + ) extends Param { +} + +/** + * record param of rule + * @param name name of record to output (optional) + */ +@JsonInclude(Include.NON_NULL) +case class RuleRecordParam( @JsonProperty("name") name: String + ) extends Param { +} + +/** + * data source cache update param of rule + * @param dsName name of data source to be updated by thie rule result (must) + */ +@JsonInclude(Include.NON_NULL) +case class RuleDsCacheUpdateParam( @JsonProperty("ds.name") dsName: String + ) extends Param { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala new file mode 100644 index 0000000..5ee0610 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala @@ -0,0 +1,97 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.configuration.params + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import org.apache.commons.lang.StringUtils + +/** + * environment param + * @param sparkParam config of spark environment (must) + * @param persistParams config of persist ways (optional) + * @param infoCacheParams config of information cache ways (must in streaming mode) + * @param cleanerParam config of cleaner (optional) + */ +@JsonInclude(Include.NON_NULL) +case class EnvParam( @JsonProperty("spark") sparkParam: SparkParam, + @JsonProperty("persist") persistParams: List[PersistParam], + @JsonProperty("info.cache") infoCacheParams: List[InfoCacheParam], + @JsonProperty("cleaner") cleanerParam: CleanerParam + ) extends Param { +} + +/** + * spark param + * @param logLevel log level of spark application (optional) + * @param cpDir checkpoint directory for spark streaming (must in streaming mode) + * @param batchInterval batch interval for spark streaming (must in streaming mode) + * @param processInterval process interval for streaming dq calculation (must in streaming mode) + * @param config extra config for spark environment (optional) + * @param initClear clear checkpoint directory or not when initial (optional) + */ +@JsonInclude(Include.NON_NULL) +case class SparkParam( @JsonProperty("log.level") logLevel: String, + @JsonProperty("checkpoint.dir") cpDir: String, + @JsonProperty("batch.interval") batchInterval: String, + @JsonProperty("process.interval") processInterval: String, + @JsonProperty("config") config: Map[String, String], + @JsonProperty("init.clear") initClear: Boolean + ) extends Param { + def getLogLevel: String = if (logLevel != null) logLevel else "WARN" + def needInitClear: Boolean = if (initClear != null) initClear else false +} + +/** + * persist param + * @param persistType persist type, e.g.: log, hdfs, http, mongo (must) + * @param config config of persist way (must) + */ +@JsonInclude(Include.NON_NULL) +case class PersistParam( @JsonProperty("type") persistType: String, + @JsonProperty("config") config: Map[String, Any] + ) extends Param { + override def validate(): Boolean = { + StringUtils.isNotBlank(persistType) + } +} + +/** + * info cache param + * @param cacheType information cache type, e.g.: zookeeper (must) + * @param config config of cache way + */ +@JsonInclude(Include.NON_NULL) +case class InfoCacheParam( @JsonProperty("type") cacheType: String, + @JsonProperty("config") config: Map[String, Any] + ) extends Param { + override def validate(): Boolean = { + StringUtils.isNotBlank(cacheType) + } +} + +/** + * cleaner param, invalid at current + * @param cleanInterval clean interval (optional) + */ +@JsonInclude(Include.NON_NULL) +case class CleanerParam( @JsonProperty("clean.interval") cleanInterval: String + ) extends Param { + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala new file mode 100644 index 0000000..87ad246 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala @@ -0,0 +1,29 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.configuration.params + +trait Param extends Serializable { + + /** + * validate param internally + * @return + */ + def validate(): Boolean = true + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala new file mode 100644 index 0000000..4c9a4d6 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala @@ -0,0 +1,38 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.configuration.validator + +import org.apache.griffin.measure.Loggable +import org.apache.griffin.measure.configuration.params._ + +import scala.util.Try + +object ParamValidator extends Loggable with Serializable { + + /** + * validate param + * @param param param to be validated + * @tparam T type of param + * @return param valid or not + */ + def validate[T <: Param](param: T): Try[Boolean] = Try { + param.validate + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/ContextId.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/ContextId.scala b/measure/src/main/scala/org/apache/griffin/measure/context/ContextId.scala new file mode 100644 index 0000000..8848a22 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/ContextId.scala @@ -0,0 +1,28 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context + +/** + * context id, unique by different timestamp and tag + */ +case class ContextId(timestamp: Long, tag: String = "") extends Serializable { + def id: String = { + if (tag.nonEmpty) s"${tag}_${timestamp}" else s"${timestamp}" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala new file mode 100644 index 0000000..e4b5046 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala @@ -0,0 +1,94 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context + +import org.apache.griffin.measure.configuration.enums._ +import org.apache.griffin.measure.configuration.params._ +import org.apache.griffin.measure.context.datasource._ +import org.apache.griffin.measure.context.writer._ +import org.apache.spark.sql.{Encoders, SQLContext, SparkSession} + +/** + * dq context: the context of each calculation + */ +case class DQContext(contextId: ContextId, + name: String, + dataSources: Seq[DataSource], + persistParams: Seq[PersistParam], + procType: ProcessType + )(@transient implicit val sparkSession: SparkSession) { + + val sqlContext: SQLContext = sparkSession.sqlContext + + val compileTableRegister: CompileTableRegister = CompileTableRegister() + val runTimeTableRegister: RunTimeTableRegister = RunTimeTableRegister(sqlContext) + + val dataFrameCache: DataFrameCache = DataFrameCache() + + val metricWrapper: MetricWrapper = MetricWrapper(name) + val writeMode = WriteMode.defaultMode(procType) + + val dataSourceNames: Seq[String] = dataSources.map(_.name) + dataSourceNames.foreach(name => compileTableRegister.registerTable(name)) + implicit val encoder = Encoders.STRING + val functionNames: Seq[String] = sparkSession.catalog.listFunctions.map(_.name).collect.toSeq + + val dataSourceTimeRanges = loadDataSources() + def loadDataSources(): Map[String, TimeRange] = { + dataSources.map { ds => + (ds.name, ds.loadData(this)) + }.toMap + } + printTimeRanges + + def getDataSourceName(index: Int): String = { + if (dataSourceNames.size > index) dataSourceNames(index) else "" + } + + private val persistFactory = PersistFactory(persistParams, name) + private val defaultPersist: Persist = persistFactory.getPersists(contextId.timestamp) + def getPersist(timestamp: Long): Persist = { + if (timestamp == contextId.timestamp) getPersist() + else persistFactory.getPersists(timestamp) + } + def getPersist(): Persist = defaultPersist + + def cloneDQContext(newContextId: ContextId): DQContext = { + DQContext(newContextId, name, dataSources, persistParams, procType)(sparkSession) + } + + def clean(): Unit = { + compileTableRegister.unregisterAllTables() + runTimeTableRegister.unregisterAllTables() + + dataFrameCache.uncacheAllDataFrames() + dataFrameCache.clearAllTrashDataFrames() + } + + private def printTimeRanges(): Unit = { + if (dataSourceTimeRanges.nonEmpty) { + val timeRangesStr = dataSourceTimeRanges.map { pair => + val (name, timeRange) = pair + s"${name} -> (${timeRange.begin}, ${timeRange.end}]" + }.mkString(", ") + println(s"data source timeRanges: ${timeRangesStr}") + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala new file mode 100644 index 0000000..765dc6f --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala @@ -0,0 +1,72 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context + +import org.apache.griffin.measure.Loggable +import org.apache.spark.sql.DataFrame + +import scala.collection.concurrent.{Map => ConcMap} +import scala.collection.mutable.{MutableList, Map => MutableMap} + +/** + * cache and unpersist dataframes + */ +case class DataFrameCache() extends Loggable { + + private val dataFrames: MutableMap[String, DataFrame] = MutableMap() + private val trashDataFrames: MutableList[DataFrame] = MutableList() + + private def trashDataFrame(df: DataFrame): Unit = { + trashDataFrames += df + } + private def trashDataFrames(dfs: Seq[DataFrame]): Unit = { + trashDataFrames ++= dfs + } + + def cacheDataFrame(name: String, df: DataFrame): Unit = { + info(s"try to cache data frame ${name}") + dataFrames.get(name) match { + case Some(odf) => { + trashDataFrame(odf) + dataFrames += (name -> df) + df.cache + info(s"cache after replace old df") + } + case _ => { + dataFrames += (name -> df) + df.cache + info(s"cache after replace no old df") + } + } + } + + def uncacheDataFrame(name: String): Unit = { + dataFrames.get(name).foreach(df => trashDataFrame(df)) + dataFrames -= name + } + def uncacheAllDataFrames(): Unit = { + trashDataFrames(dataFrames.values.toSeq) + dataFrames.clear + } + + def clearAllTrashDataFrames(): Unit = { + trashDataFrames.foreach(_.unpersist) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala b/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala new file mode 100644 index 0000000..d98952a --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala @@ -0,0 +1,49 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context + +import scala.collection.mutable.{Map => MutableMap} + +/** + * wrap metrics into one, each calculation produces one metric map + */ +case class MetricWrapper(name: String) extends Serializable { + + val metrics: MutableMap[Long, Map[String, Any]] = MutableMap() + + def insertMetric(timestamp: Long, value: Map[String, Any]): Unit = { + val newValue = metrics.get(timestamp) match { + case Some(v) => v ++ value + case _ => value + } + metrics += (timestamp -> newValue) + } + + def flush: Map[Long, Map[String, Any]] = { + metrics.toMap.map { pair => + val (timestamp, value) = pair + (timestamp, Map[String, Any]( + ("name" -> name), + ("timestamp" -> timestamp), + ("value" -> value) + )) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala new file mode 100644 index 0000000..8d86170 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala @@ -0,0 +1,81 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context + +import org.apache.griffin.measure.Loggable +import org.apache.spark.sql._ + +import scala.collection.mutable.{Set => MutableSet} + +/** + * register table name + */ +trait TableRegister extends Loggable with Serializable { + + protected val tables: MutableSet[String] = MutableSet() + + def registerTable(name: String): Unit = { + tables += name + } + + def existsTable(name: String): Boolean = { + tables.exists(_.equals(name)) + } + + def unregisterTable(name: String): Unit = { + if (existsTable(name)) tables -= name + } + def unregisterAllTables(): Unit = { + tables.clear + } + + def getTables(): Set[String] = { + tables.toSet + } + +} + +/** + * register table name when building dq job + */ +case class CompileTableRegister() extends TableRegister {} + +/** + * register table name and create temp view during calculation + */ +case class RunTimeTableRegister(@transient sqlContext: SQLContext) extends TableRegister { + + def registerTable(name: String, df: DataFrame): Unit = { + registerTable(name) + df.createOrReplaceTempView(name) + } + + override def unregisterTable(name: String): Unit = { + if (existsTable(name)) { + sqlContext.dropTempTable(name) + tables -= name + } + } + override def unregisterAllTables(): Unit = { + val uts = getTables + uts.foreach(t => sqlContext.dropTempTable(t)) + tables.clear + } + +} \ No newline at end of file
