refactored the 'measure' portion to have the clearer structure Author: Grant Guo <[email protected]>
Closes #287 from grant-xuexu/feature/refactoring. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/1d7acd57 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/1d7acd57 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/1d7acd57 Branch: refs/heads/master Commit: 1d7acd57a9dbbaec4470d948f7d395481b84e19d Parents: 4ca4cdb Author: Grant Guo <[email protected]> Authored: Tue May 29 12:22:31 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Tue May 29 12:22:31 2018 +0800 ---------------------------------------------------------------------- measure/pom.xml | 408 +++++++++---------- measure/src/main/resources/config-old.json | 31 -- measure/src/main/resources/config-sql.json | 54 --- .../src/main/resources/config-streaming.json | 61 --- measure/src/main/resources/config.json | 59 --- measure/src/main/resources/env.json | 60 --- .../apache/griffin/measure/Application.scala | 94 ++--- .../org/apache/griffin/measure/Loggable.scala | 43 ++ .../griffin/measure/cache/info/InfoCache.scala | 39 -- .../measure/cache/info/InfoCacheFactory.scala | 41 -- .../measure/cache/info/InfoCacheInstance.scala | 53 --- .../measure/cache/info/TimeInfoCache.scala | 127 ------ .../measure/cache/info/ZKInfoCache.scala | 212 ---------- .../griffin/measure/cache/lock/CacheLock.scala | 31 -- .../measure/cache/lock/MultiCacheLock.scala | 39 -- .../measure/cache/lock/ZKCacheLock.scala | 53 --- .../measure/cache/result/CacheResult.scala | 29 -- .../cache/result/CacheResultProcesser.scala | 71 ---- .../griffin/measure/cache/tmst/TempName.scala | 47 --- .../griffin/measure/cache/tmst/TmstCache.scala | 46 --- .../measure/config/params/AllParam.scala | 32 -- .../griffin/measure/config/params/Param.scala | 25 -- .../config/params/env/CleanerParam.scala | 29 -- .../measure/config/params/env/EmailParam.scala | 36 -- .../measure/config/params/env/EnvParam.scala | 34 -- .../config/params/env/InfoCacheParam.scala | 30 -- .../config/params/env/PersistParam.scala | 30 -- .../measure/config/params/env/SMSParam.scala | 36 -- .../measure/config/params/env/SparkParam.scala | 36 -- .../config/params/user/DataConnectorParam.scala | 33 -- .../config/params/user/DataSourceParam.scala | 35 -- .../config/params/user/EvaluateRuleParam.scala | 30 -- .../measure/config/params/user/UserParam.scala | 64 --- .../measure/config/reader/ParamFileReader.scala | 38 -- .../config/reader/ParamHdfsFileReader.scala | 38 -- .../config/reader/ParamRawStringReader.scala | 35 -- .../measure/config/reader/ParamReader.scala | 30 -- .../config/reader/ParamReaderFactory.scala | 40 -- .../config/validator/ParamValidator.scala | 32 -- .../measure/configuration/enums/DqType.scala | 112 +++++ .../measure/configuration/enums/DslType.scala | 64 +++ .../configuration/enums/NormalizeType.scala | 86 ++++ .../configuration/enums/ProcessType.scala | 56 +++ .../measure/configuration/enums/WriteMode.scala | 43 ++ .../configuration/json/ParamFileReader.scala | 43 ++ .../configuration/json/ParamJsonReader.scala | 39 ++ .../configuration/json/ParamReader.scala | 35 ++ .../configuration/json/ParamReaderFactory.scala | 48 +++ .../measure/configuration/params/AllParam.scala | 36 ++ .../measure/configuration/params/DQParam.scala | 183 +++++++++ .../measure/configuration/params/EnvParam.scala | 97 +++++ .../measure/configuration/params/Param.scala | 29 ++ .../validator/ParamValidator.scala | 38 ++ .../griffin/measure/context/ContextId.scala | 28 ++ .../griffin/measure/context/DQContext.scala | 94 +++++ .../measure/context/DataFrameCache.scala | 72 ++++ .../griffin/measure/context/MetricWrapper.scala | 49 +++ .../griffin/measure/context/TableRegister.scala | 81 ++++ .../griffin/measure/context/TimeRange.scala | 48 +++ .../measure/context/datasource/DataSource.scala | 97 +++++ .../context/datasource/DataSourceFactory.scala | 66 +++ .../datasource/cache/DataSourceCache.scala | 366 +++++++++++++++++ .../cache/DataSourceCacheFactory.scala | 68 ++++ .../datasource/cache/JsonDataSourceCache.scala | 40 ++ .../datasource/cache/OrcDataSourceCache.scala | 40 ++ .../cache/ParquetDataSourceCache.scala | 42 ++ .../context/datasource/cache/WithFanIn.scala | 69 ++++ .../datasource/connector/DataConnector.scala | 112 +++++ .../connector/DataConnectorFactory.scala | 125 ++++++ .../batch/AvroBatchDataConnector.scala | 71 ++++ .../connector/batch/BatchDataConnector.scala | 27 ++ .../batch/HiveBatchDataConnector.scala | 86 ++++ .../batch/TextDirBatchDataConnector.scala | 106 +++++ .../streaming/KafkaStreamingDataConnector.scala | 85 ++++ .../KafkaStreamingStringDataConnector.scala | 71 ++++ .../streaming/StreamingDataConnector.scala | 46 +++ .../datasource/info/DataSourceCacheable.scala | 88 ++++ .../context/datasource/info/TmstCache.scala | 47 +++ .../context/streaming/info/InfoCache.scala | 39 ++ .../streaming/info/InfoCacheFactory.scala | 41 ++ .../streaming/info/InfoCacheInstance.scala | 53 +++ .../context/streaming/info/TimeInfoCache.scala | 127 ++++++ .../context/streaming/info/ZKInfoCache.scala | 217 ++++++++++ .../context/streaming/lock/CacheLock.scala | 34 ++ .../context/streaming/lock/MultiCacheLock.scala | 39 ++ .../context/streaming/lock/ZKCacheLock.scala | 53 +++ .../streaming/metric/AccuracyMetric.scala | 54 +++ .../context/streaming/metric/CacheResults.scala | 78 ++++ .../context/streaming/metric/Metric.scala | 35 ++ .../measure/context/writer/HdfsPersist.scala | 190 +++++++++ .../measure/context/writer/HttpPersist.scala | 76 ++++ .../measure/context/writer/LoggerPersist.scala | 82 ++++ .../measure/context/writer/MongoPersist.scala | 118 ++++++ .../measure/context/writer/MultiPersists.scala | 83 ++++ .../measure/context/writer/Persist.scala | 45 ++ .../measure/context/writer/PersistFactory.scala | 55 +++ .../context/writer/PersistThreadPool.scala | 81 ++++ .../measure/data/connector/DataConnector.scala | 147 ------- .../data/connector/DataConnectorFactory.scala | 147 ------- .../batch/AvroBatchDataConnector.scala | 115 ------ .../connector/batch/BatchDataConnector.scala | 35 -- .../batch/HiveBatchDataConnector.scala | 165 -------- .../batch/TextDirBatchDataConnector.scala | 139 ------- .../streaming/KafkaStreamingDataConnector.scala | 87 ---- .../KafkaStreamingStringDataConnector.scala | 65 --- .../streaming/StreamingDataConnector.scala | 45 -- .../measure/data/source/DataSource.scala | 140 ------- .../measure/data/source/DataSourceFactory.scala | 80 ---- .../data/source/cache/DataCacheable.scala | 84 ---- .../data/source/cache/DataSourceCache.scala | 380 ----------------- .../source/cache/DataSourceCacheFactory.scala | 58 --- .../data/source/cache/JsonDataSourceCache.scala | 40 -- .../data/source/cache/OrcDataSourceCache.scala | 40 -- .../source/cache/ParquetDataSourceCache.scala | 40 -- .../measure/data/source/cache/WithFanIn.scala | 57 --- .../org/apache/griffin/measure/job/DQJob.scala | 32 ++ .../measure/job/builder/DQJobBuilder.scala | 68 ++++ .../apache/griffin/measure/launch/DQApp.scala | 52 +++ .../measure/launch/batch/BatchDQApp.scala | 107 +++++ .../launch/streaming/StreamingDQApp.scala | 177 ++++++++ .../launch/streaming/StreamingDQApp2.scala | 104 +++++ .../apache/griffin/measure/log/Loggable.scala | 43 -- .../griffin/measure/persist/HdfsPersist.scala | 344 ---------------- .../griffin/measure/persist/HttpPersist.scala | 115 ------ .../griffin/measure/persist/LoggerPersist.scala | 184 --------- .../griffin/measure/persist/MongoPersist.scala | 119 ------ .../measure/persist/MongoThreadPool.scala | 73 ---- .../griffin/measure/persist/MultiPersists.scala | 91 ----- .../griffin/measure/persist/Persist.scala | 57 --- .../measure/persist/PersistFactory.scala | 55 --- .../measure/persist/PersistThreadPool.scala | 62 --- .../measure/process/BatchDqProcess.scala | 177 -------- .../griffin/measure/process/DqProcess.scala | 45 -- .../griffin/measure/process/ExportMode.scala | 34 -- .../griffin/measure/process/ProcessType.scala | 47 --- .../measure/process/StreamingDqProcess.scala | 180 -------- .../measure/process/StreamingDqThread.scala | 146 ------- .../griffin/measure/process/TimingProcess.scala | 46 --- .../process/engine/DataFrameOprEngine.scala | 179 -------- .../measure/process/engine/DqEngine.scala | 51 --- .../process/engine/DqEngineFactory.scala | 47 --- .../measure/process/engine/DqEngines.scala | 242 ----------- .../measure/process/engine/SparkDqEngine.scala | 183 --------- .../process/engine/SparkRowFormatter.scala | 62 --- .../measure/process/engine/SparkSqlEngine.scala | 76 ---- .../measure/process/temp/DataFrameCaches.scala | 133 ------ .../measure/process/temp/TableRegisters.scala | 153 ------- .../measure/process/temp/TableRegs.scala | 81 ---- .../measure/process/temp/TimeRange.scala | 48 --- .../griffin/measure/result/AccuracyResult.scala | 50 --- .../griffin/measure/result/DataInfo.scala | 50 --- .../griffin/measure/result/ProfileResult.scala | 48 --- .../apache/griffin/measure/result/Result.scala | 36 -- .../measure/rule/adaptor/AdaptPhase.scala | 25 -- .../rule/adaptor/DataFrameOprAdaptor.scala | 62 --- .../measure/rule/adaptor/GlobalKeys.scala | 27 -- .../rule/adaptor/GriffinDslAdaptor.scala | 73 ---- .../measure/rule/adaptor/InternalColumns.scala | 35 -- .../measure/rule/adaptor/RuleAdaptor.scala | 100 ----- .../measure/rule/adaptor/RuleAdaptorGroup.scala | 311 -------------- .../measure/rule/adaptor/SparkSqlAdaptor.scala | 55 --- .../griffin/measure/rule/dsl/CollectType.scala | 57 --- .../griffin/measure/rule/dsl/DqType.scala | 75 ---- .../griffin/measure/rule/dsl/DslType.scala | 53 --- .../griffin/measure/rule/dsl/PersistType.scala | 60 --- .../rule/dsl/analyzer/AccuracyAnalyzer.scala | 41 -- .../rule/dsl/analyzer/BasicAnalyzer.scala | 53 --- .../dsl/analyzer/CompletenessAnalyzer.scala | 46 --- .../dsl/analyzer/DistinctnessAnalyzer.scala | 47 --- .../rule/dsl/analyzer/ProfilingAnalyzer.scala | 42 -- .../rule/dsl/analyzer/TimelinessAnalyzer.scala | 65 --- .../rule/dsl/analyzer/UniquenessAnalyzer.scala | 46 --- .../measure/rule/dsl/expr/AliasableExpr.scala | 25 -- .../rule/dsl/expr/ClauseExpression.scala | 270 ------------ .../griffin/measure/rule/dsl/expr/Expr.scala | 32 -- .../griffin/measure/rule/dsl/expr/ExprTag.scala | 23 -- .../rule/dsl/expr/ExtraConditionExpr.scala | 27 -- .../measure/rule/dsl/expr/FunctionExpr.scala | 45 -- .../measure/rule/dsl/expr/LiteralExpr.scala | 72 ---- .../measure/rule/dsl/expr/LogicalExpr.scala | 204 ---------- .../measure/rule/dsl/expr/MathExpr.scala | 94 ----- .../measure/rule/dsl/expr/SelectExpr.scala | 132 ------ .../measure/rule/dsl/expr/TreeNode.scala | 45 -- .../measure/rule/dsl/parser/BasicParser.scala | 388 ------------------ .../rule/dsl/parser/GriffinDslParser.scala | 94 ----- .../griffin/measure/rule/plan/DfOprStep.scala | 32 -- .../griffin/measure/rule/plan/DsUpdate.scala | 24 -- .../measure/rule/plan/MetricExport.scala | 31 -- .../measure/rule/plan/RecordExport.scala | 31 -- .../griffin/measure/rule/plan/RuleExport.scala | 33 -- .../griffin/measure/rule/plan/RulePlan.scala | 59 --- .../griffin/measure/rule/plan/RuleStep.scala | 40 -- .../measure/rule/plan/SparkSqlStep.scala | 32 -- .../griffin/measure/rule/plan/TimeInfo.scala | 37 -- .../rule/preproc/PreProcRuleGenerator.scala | 72 ---- .../rule/trans/AccuracyRulePlanTrans.scala | 200 --------- .../rule/trans/CompletenessRulePlanTrans.scala | 145 ------- .../rule/trans/DistinctnessRulePlanTrans.scala | 348 ---------------- .../measure/rule/trans/DsUpdateFactory.scala | 37 -- .../rule/trans/ProfilingRulePlanTrans.scala | 100 ----- .../measure/rule/trans/RuleExportFactory.scala | 65 --- .../measure/rule/trans/RulePlanTrans.scala | 60 --- .../rule/trans/TimelinessRulePlanTrans.scala | 224 ---------- .../rule/trans/UniquenessRulePlanTrans.scala | 200 --------- .../griffin/measure/rule/udf/GriffinUdafs.scala | 29 -- .../griffin/measure/rule/udf/GriffinUdfs.scala | 43 -- .../griffin/measure/rule/udf/MeanUdaf.scala | 58 --- .../apache/griffin/measure/step/DQStep.scala | 32 ++ .../apache/griffin/measure/step/SeqDQStep.scala | 44 ++ .../builder/BatchDataSourceStepBuilder.scala | 31 ++ .../measure/step/builder/ConstantColumns.scala | 38 ++ .../measure/step/builder/DQStepBuilder.scala | 81 ++++ .../step/builder/DQStepNameGenerator.scala | 34 ++ .../builder/DataFrameOpsDQStepBuilder.scala | 35 ++ .../builder/DataSourceParamStepBuilder.scala | 44 ++ .../step/builder/GriffinDslDQStepBuilder.scala | 60 +++ .../step/builder/RuleParamStepBuilder.scala | 61 +++ .../step/builder/SparkSqlDQStepBuilder.scala | 35 ++ .../StreamingDataSourceStepBuilder.scala | 31 ++ .../step/builder/dsl/expr/AliasableExpr.scala | 25 ++ .../builder/dsl/expr/ClauseExpression.scala | 270 ++++++++++++ .../measure/step/builder/dsl/expr/Expr.scala | 35 ++ .../measure/step/builder/dsl/expr/ExprTag.scala | 23 ++ .../builder/dsl/expr/ExtraConditionExpr.scala | 27 ++ .../step/builder/dsl/expr/FunctionExpr.scala | 45 ++ .../step/builder/dsl/expr/LiteralExpr.scala | 72 ++++ .../step/builder/dsl/expr/LogicalExpr.scala | 204 ++++++++++ .../step/builder/dsl/expr/MathExpr.scala | 94 +++++ .../step/builder/dsl/expr/SelectExpr.scala | 132 ++++++ .../step/builder/dsl/expr/TreeNode.scala | 45 ++ .../step/builder/dsl/parser/BasicParser.scala | 391 ++++++++++++++++++ .../builder/dsl/parser/GriffinDslParser.scala | 98 +++++ .../dsl/transform/AccuracyExpr2DQSteps.scala | 200 +++++++++ .../transform/CompletenessExpr2DQSteps.scala | 157 +++++++ .../transform/DistinctnessExpr2DQSteps.scala | 358 ++++++++++++++++ .../builder/dsl/transform/Expr2DQSteps.scala | 59 +++ .../dsl/transform/ProfilingExpr2DQSteps.scala | 105 +++++ .../dsl/transform/TimelinessExpr2DQSteps.scala | 234 +++++++++++ .../dsl/transform/UniquenessExpr2DQSteps.scala | 204 ++++++++++ .../transform/analyzer/AccuracyAnalyzer.scala | 41 ++ .../dsl/transform/analyzer/BasicAnalyzer.scala | 55 +++ .../analyzer/CompletenessAnalyzer.scala | 46 +++ .../analyzer/DistinctnessAnalyzer.scala | 47 +++ .../transform/analyzer/ProfilingAnalyzer.scala | 42 ++ .../transform/analyzer/TimelinessAnalyzer.scala | 65 +++ .../transform/analyzer/UniquenessAnalyzer.scala | 46 +++ .../preproc/PreProcRuleParamGenerator.scala | 72 ++++ .../measure/step/builder/udf/GriffinUDFs.scala | 63 +++ .../griffin/measure/step/read/ReadStep.scala | 48 +++ .../measure/step/read/UnionReadStep.scala | 41 ++ .../measure/step/transform/DataFrameOps.scala | 134 ++++++ .../transform/DataFrameOpsTransformStep.scala | 52 +++ .../step/transform/SparkSqlTransformStep.scala | 47 +++ .../measure/step/transform/TransformStep.scala | 31 ++ .../step/write/DsCacheUpdateWriteStep.scala | 61 +++ .../measure/step/write/MetricFlushStep.scala | 45 ++ .../measure/step/write/MetricWriteStep.scala | 115 ++++++ .../measure/step/write/RecordWriteStep.scala | 151 +++++++ .../measure/step/write/SparkRowFormatter.scala | 65 +++ .../griffin/measure/step/write/WriteStep.scala | 31 ++ .../apache/griffin/measure/util/MailUtil.java | 74 ---- .../apache/griffin/measure/util/Md5Util.java | 56 --- .../griffin/measure/util/MessageUtil.java | 178 -------- .../griffin/measure/utils/DataFrameUtil.scala | 2 +- .../apache/griffin/measure/utils/FSUtil.scala | 82 ++++ .../measure/utils/HdfsFileDumpUtil.scala | 101 ----- .../apache/griffin/measure/utils/HdfsUtil.scala | 71 ++-- .../griffin/measure/utils/ParamUtil.scala | 12 + .../apache/griffin/measure/utils/TimeUtil.scala | 2 +- .../resources/_accuracy-batch-griffindsl.json | 56 --- .../resources/_accuracy-batch-sparksql.json | 63 --- .../_accuracy-streaming-griffindsl.json | 121 ------ .../resources/_accuracy-streaming-sparksql.json | 142 ------- .../_completeness-batch-griffindsl.json | 36 -- .../_completeness-streaming-griffindsl.json | 65 --- .../_distinctness-batch-griffindsl.json | 57 --- .../_distinctness-batch-griffindsl1.json | 73 ---- .../_distinctness-batch-griffindsl2.json | 74 ---- .../_distinctness-streaming-griffindsl.json | 85 ---- .../_profiling-batch-griffindsl-hive.json | 48 --- .../resources/_profiling-batch-griffindsl.json | 54 --- .../resources/_profiling-batch-sparksql.json | 44 -- .../_profiling-streaming-griffindsl.json | 75 ---- .../_profiling-streaming-sparksql.json | 80 ---- .../resources/_timeliness-batch-griffindsl.json | 49 --- .../resources/_timeliness-batch-sparksql.json | 52 --- .../_timeliness-streaming-griffindsl.json | 79 ---- .../_timeliness-streaming-sparksql.json | 82 ---- .../resources/_uniqueness-batch-griffindsl.json | 58 --- .../_uniqueness-streaming-griffindsl.json | 119 ------ .../_uniqueness-streaming-sparksql.json | 130 ------ .../src/test/resources/config-griffindsl.json | 56 +++ measure/src/test/resources/config-profile.json | 19 - .../resources/config-streaming-accuracy.json | 121 ++++++ .../src/test/resources/config-streaming.json | 126 +++--- .../src/test/resources/config-streaming1.json | 65 --- .../src/test/resources/config-streaming2.json | 65 --- .../src/test/resources/config-streaming3.json | 65 --- .../resources/config-test-accuracy-new.json | 56 --- .../resources/config-test-accuracy-new2.json | 72 ---- .../config-test-accuracy-streaming-multids.json | 142 ------- .../config-test-accuracy-streaming-new.json | 117 ------ .../config-test-accuracy-streaming-new2.json | 133 ------ .../config-test-accuracy-streaming.json | 117 ------ .../test/resources/config-test-accuracy.json | 58 --- .../test/resources/config-test-accuracy2.json | 64 --- .../resources/config-test-profiling-new.json | 80 ---- .../resources/config-test-profiling-new2.json | 36 -- .../config-test-profiling-streaming-new.json | 85 ---- .../config-test-profiling-streaming-new2.json | 72 ---- .../config-test-profiling-streaming.json | 91 ----- .../test/resources/config-test-profiling.json | 36 -- .../test/resources/config-test-profiling1.json | 60 --- .../test/resources/config-test-profiling2.json | 35 -- measure/src/test/resources/config-test.json | 55 --- measure/src/test/resources/config-test1.json | 96 ----- measure/src/test/resources/config.json | 82 +++- measure/src/test/resources/config1.json | 29 -- measure/src/test/resources/dupdata.avro | Bin 304 -> 0 bytes measure/src/test/resources/empty.avro | Bin 215 -> 0 bytes measure/src/test/resources/env-hdfs-test.json | 45 -- .../src/test/resources/env-streaming-mongo.json | 54 --- measure/src/test/resources/env-test.json | 39 -- measure/src/test/resources/env.json | 20 +- measure/src/test/resources/env1.json | 21 - .../resources/performance-test-accuracy.json | 56 --- .../resources/performance-test-profiling.json | 34 -- measure/src/test/resources/test-data0.json | 56 --- measure/src/test/resources/users_info_src.dat | 50 --- .../src/test/resources/users_info_target.dat | 50 --- .../griffin/measure/ApplicationTest.scala | 25 ++ .../reader/ParamRawStringReaderTest.scala | 37 -- .../validator/AllParamValidatorTest.scala | 40 -- .../measure/data/connector/ConnectorTest.scala | 97 ----- .../measure/persist/HdfsPersistTest.scala | 48 --- .../measure/persist/HttpPersistTest.scala | 42 -- .../measure/persist/MongoPersistTest.scala | 47 --- .../measure/result/AccuracyResultTest.scala | 57 --- .../measure/result/ProfileResultTest.scala | 57 --- .../rule/adaptor/GriffinDslAdaptorTest.scala | 178 -------- .../rule/adaptor/RuleAdaptorGroupTest.scala | 70 ---- .../rule/adaptor/SparkSqlAdaptorTest.scala | 59 --- .../rule/dsl/parser/BasicParserTest.scala | 234 ----------- .../measure/rule/udf/GriffinUdfsTest.scala | 52 --- .../griffin/measure/rule/udf/MeanUdafTest.scala | 117 ------ .../griffin/measure/utils/HdfsUtilTest.scala | 132 ------ .../griffin/measure/utils/ParamUtilTest.scala | 50 --- .../griffin/measure/utils/TimeUtilTest.scala | 38 -- 348 files changed, 10318 insertions(+), 17321 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/pom.xml ---------------------------------------------------------------------- diff --git a/measure/pom.xml b/measure/pom.xml index 04402af..79cf3cb 100644 --- a/measure/pom.xml +++ b/measure/pom.xml @@ -18,211 +18,205 @@ specific language governing permissions and limitations under the License. --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.griffin</groupId> - <artifactId>griffin</artifactId> - <version>0.2.0-incubating-SNAPSHOT</version> - </parent> - - <artifactId>measure</artifactId> - <packaging>jar</packaging> - - <name>Apache Griffin :: Measures</name> - <url>http://maven.apache.org</url> - - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - - <maven.compiler.source>1.8</maven.compiler.source> - <maven.compiler.target>1.8</maven.compiler.target> - - <scala.version>2.11.8</scala.version> - <spark.version>2.2.1</spark.version> - <scala.binary.version>2.11</scala.binary.version> - - <avro.version>1.7.7</avro.version> - <jackson.version>2.8.7</jackson.version> - <scalaj.version>2.3.0</scalaj.version> - <mongo.version>2.1.0</mongo.version> - <junit.version>4.11</junit.version> - <scalatest.version>3.0.0</scalatest.version> - <slf4j.version>1.7.21</slf4j.version> - <log4j.version>1.2.16</log4j.version> - <curator.version>2.10.0</curator.version> - <scalamock.version>3.6.0</scalamock.version> - </properties> - - <dependencies> - <!--scala--> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>${scala.version}</version> - </dependency> - - <!--spark, spark streaming, spark hive--> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - </dependency> - - <!--jackson--> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <version>${jackson.version}</version> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.module</groupId> - <artifactId>jackson-module-scala_${scala.binary.version}</artifactId> - <version>${jackson.version}</version> - </dependency> - - <!--scalaj for http request--> - <dependency> - <groupId>org.scalaj</groupId> - <artifactId>scalaj-http_${scala.binary.version}</artifactId> - <version>${scalaj.version}</version> - </dependency> - - <!--mongo request--> - <dependency> - <groupId>org.mongodb.scala</groupId> - <artifactId>mongo-scala-driver_2.11</artifactId> - <version>${mongo.version}</version> - </dependency> - - <!--avro--> - <dependency> - <groupId>com.databricks</groupId> - <artifactId>spark-avro_${scala.binary.version}</artifactId> - <version>4.0.0</version> - </dependency> - - <!--log4j--> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <version>${slf4j.version}</version> - </dependency> - <!--<dependency>--> - <!--<groupId>org.slf4j</groupId>--> - <!--<artifactId>slf4j-simple</artifactId>--> - <!--<version>${slf4j.version}</version>--> - <!--</dependency>--> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>${slf4j.version}</version> - </dependency> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - <version>${log4j.version}</version> - </dependency> - - <!--curator--> - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-recipes</artifactId> - <version>${curator.version}</version> - </dependency> - - <!--junit--> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>${junit.version}</version> - <scope>test</scope> - </dependency> - - <!--scala test--> - <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.binary.version}</artifactId> - <version>${scalatest.version}</version> - <scope>test</scope> - </dependency> - - <!--scala mock--> - <dependency> - <groupId>org.scalamock</groupId> - <artifactId>scalamock-scalatest-support_${scala.binary.version}</artifactId> - <version>${scalamock.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>com.sun.mail</groupId> - <artifactId>javax.mail</artifactId> - <version>1.4.4</version> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.scala-tools</groupId> - <artifactId>maven-scala-plugin</artifactId> - <version>2.15.2</version> - <executions> - <execution> - <id>compile</id> - <goals> - <goal>compile</goal> - </goals> - <phase>compile</phase> - </execution> - </executions> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.5.1</version> - <configuration> - <source>1.8</source> - <target>1.8</target> - </configuration> - </plugin> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <configuration> - <descriptorRefs> - <descriptorRef>jar-with-dependencies</descriptorRef> - </descriptorRefs> - <appendAssemblyId>false</appendAssemblyId> - </configuration> - <executions> - <execution> - <id>make-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.griffin</groupId> + <artifactId>griffin</artifactId> + <version>0.2.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>measure</artifactId> + <packaging>jar</packaging> + + <name>Apache Griffin :: Measures</name> + <url>http://maven.apache.org</url> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + + <scala.version>2.11.8</scala.version> + <spark.version>2.2.1</spark.version> + <scala.binary.version>2.11</scala.binary.version> + + <avro.version>1.7.7</avro.version> + <jackson.version>2.8.7</jackson.version> + <scalaj.version>2.3.0</scalaj.version> + <mongo.version>2.1.0</mongo.version> + <junit.version>4.11</junit.version> + <scalatest.version>3.0.0</scalatest.version> + <slf4j.version>1.7.21</slf4j.version> + <log4j.version>1.2.16</log4j.version> + <curator.version>2.10.0</curator.version> + <scalamock.version>3.6.0</scalamock.version> + </properties> + + <dependencies> + <!--scala--> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> + + <!--spark, spark streaming, spark hive--> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + </dependency> + + <!--jackson--> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.module</groupId> + <artifactId>jackson-module-scala_${scala.binary.version}</artifactId> + <version>${jackson.version}</version> + </dependency> + + <!--scalaj for http request--> + <dependency> + <groupId>org.scalaj</groupId> + <artifactId>scalaj-http_${scala.binary.version}</artifactId> + <version>${scalaj.version}</version> + </dependency> + + <!--mongo request--> + <dependency> + <groupId>org.mongodb.scala</groupId> + <artifactId>mongo-scala-driver_2.11</artifactId> + <version>${mongo.version}</version> + </dependency> + + <!--avro--> + <dependency> + <groupId>com.databricks</groupId> + <artifactId>spark-avro_${scala.binary.version}</artifactId> + <version>4.0.0</version> + </dependency> + + <!--log4j--> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + <!--<dependency>--> + <!--<groupId>org.slf4j</groupId>--> + <!--<artifactId>slf4j-simple</artifactId>--> + <!--<version>${slf4j.version}</version>--> + <!--</dependency>--> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j.version}</version> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>${log4j.version}</version> + </dependency> + + <!--curator--> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <version>${curator.version}</version> + </dependency> + + <!--junit--> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + + <!--scala test--> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <version>${scalatest.version}</version> + <scope>test</scope> + </dependency> + + <!--scala mock--> + <dependency> + <groupId>org.scalamock</groupId> + <artifactId>scalamock-scalatest-support_${scala.binary.version}</artifactId> + <version>${scalamock.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <version>2.15.2</version> + <executions> + <execution> + <id>compile</id> + <goals> + <goal>compile</goal> + </goals> + <phase>compile</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.5.1</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + <appendAssemblyId>false</appendAssemblyId> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/resources/config-old.json ---------------------------------------------------------------------- diff --git a/measure/src/main/resources/config-old.json b/measure/src/main/resources/config-old.json deleted file mode 100644 index ab32b75..0000000 --- a/measure/src/main/resources/config-old.json +++ /dev/null @@ -1,31 +0,0 @@ -{ - "name": "accu1", - "type": "accuracy", - - "process.type": "batch", - - "source": { - "type": "hive", - "version": "1.2", - "config": { - "database": "default", - "table.name": "users_info_src", - "partitions": "dt=23123, hour=432; dt=35464, hour=4657" - } - }, - - "target": { - "type": "hive", - "version": "1.2", - "config": { - "database": "default", - "table.name": "users_info_target", - "partitions": "dt=23123, hour=432; dt=35464, hour=4657" - } - }, - - "evaluateRule": { - "sampleRatio": 0.2, - "rules": "$source.user_id = $target.user_id AND $source.first_name = $target.first_name AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code" - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/resources/config-sql.json ---------------------------------------------------------------------- diff --git a/measure/src/main/resources/config-sql.json b/measure/src/main/resources/config-sql.json deleted file mode 100644 index aad9584..0000000 --- a/measure/src/main/resources/config-sql.json +++ /dev/null @@ -1,54 +0,0 @@ -{ - "name": "accu1", - - "process.type": "batch", - - "data.sources": [ - { - "name": "source", - "connectors": [ - { - "type": "hive", - "version": "1.2", - "config": { - "database": "default", - "table.name": "users_info_src", - "partitions": "dt=23123, hour=432; dt=35464, hour=4657" - } - } - ] - }, { - "name": "target", - "connectors": [ - { - "type": "hive", - "version": "1.2", - "config": { - "database": "default", - "table.name": "users_info_target", - "partitions": "dt=23123, hour=432; dt=35464, hour=4657" - } - } - ] - } - ], - - "evaluateRule": { - "dsl.type": "spark-sql", - "rules": [ - { - "name": "miss.record", - "rule": "SELECT source.name FROM source LEFT JOIN target ON coalesce(source.name, 'null') = coalesce(target.name, 'null') WHERE (NOT (source.name IS NULL)) AND (target.name IS NULL)", - "persist.type": "record" - }, { - "name": "miss.count", - "rule": "SELECT COUNT(*) FROM miss", - "persist.type": "metric" - }, { - "name": "total.count", - "rule": "SELECT COUNT(*) FROM source", - "persist.type": "metric" - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/resources/config-streaming.json ---------------------------------------------------------------------- diff --git a/measure/src/main/resources/config-streaming.json b/measure/src/main/resources/config-streaming.json deleted file mode 100644 index 1705174..0000000 --- a/measure/src/main/resources/config-streaming.json +++ /dev/null @@ -1,61 +0,0 @@ -{ - "name": "accu2", - "type": "accuracy", - - "process.type": "steaming", - - "source": { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "a.b.c.d:9092", - "group.id": "group1", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false", - }, - "topics": "src", - "key.type": "java.lang.String", - "value.type": "java.lang.String", - "cache": { - "type": "df", - "config": { - "table.name": "source", - "info.path": "src", - "ready.time.interval": "1m", - "ready.time.delay": "1m" - } - } - } - }, - - "target": { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "a.b.c.d:9092", - "group.id": "group1", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false", - }, - "topics": "tgt", - "key.type": "java.lang.String", - "value.type": "java.lang.String", - "cache": { - "type": "hive", - "version": 1.2, - "config": { - "database": "default", - "table.name": "target_table", - "info.path": "tgt" - } - } - } - }, - - "evaluateRule": { - "sampleRatio": 0.2, - "rules": "$source.json().seeds[*].json().url = $target.json().groups[0].attrsList['name' = 'URL'].values[0] AND $source.json().seeds[*].json().metadata.json().tracker.crawlRequestCreateTS = $target.json().groups[0].attrsList['name' = 'CRAWLMETADATA'].values[0].json().tracker.crawlRequestCreateTS WHEN $source._timestamp_ + 24h < $target._timestamp_" - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/resources/config.json ---------------------------------------------------------------------- diff --git a/measure/src/main/resources/config.json b/measure/src/main/resources/config.json deleted file mode 100644 index 1edb299..0000000 --- a/measure/src/main/resources/config.json +++ /dev/null @@ -1,59 +0,0 @@ -{ - "name": "accu1", - - "process.type": "batch", - - "data.sources": [ - { - "name": "source", - "connectors": [ - { - "type": "hive", - "version": "1.2", - "config": { - "database": "default", - "table.name": "users_info_src", - "partitions": "dt=23123, hour=432; dt=35464, hour=4657" - } - } - ] - }, { - "name": "target", - "connectors": [ - { - "type": "hive", - "version": "1.2", - "config": { - "database": "default", - "table.name": "users_info_target", - "partitions": "dt=23123, hour=432; dt=35464, hour=4657" - } - } - ] - } - ], - "evaluateRule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "accuracy", - "rule": "source.user_id = target.user_id AND source.first_name = target.first_name AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code", - "details": { - "source": "source", - "miss.record": { - "name": "miss.record", - "persist.type": "record" - }, - "miss.count": { - "name": "miss.count", - "persist.type": "metric" - }, - "total.count": { - "name": "total.count", - "persist.type": "metric" - } - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/resources/env.json ---------------------------------------------------------------------- diff --git a/measure/src/main/resources/env.json b/measure/src/main/resources/env.json deleted file mode 100644 index 1081b6a..0000000 --- a/measure/src/main/resources/env.json +++ /dev/null @@ -1,60 +0,0 @@ -{ - "spark": { - "log.level": "INFO", - "checkpoint.dir": "hdfs:///griffin/batch/cp", - "batch.interval": "10s", - "process.interval": "10m", - "config": {} - }, - - "persist": [ - { - "type": "hdfs", - "config": { - "path": "hdfs:///griffin/streaming/persist", - "max.persist.lines": 10000, - "max.lines.per.file": 10000 - } - }, - { - "type": "http", - "config": { - "method": "post", - "api": "http://HOSTNAME:9200/griffin/accuracy" - } - } - ], - - "info.cache": [ - { - "type": "zk", - "config": { - "hosts": "localhost:2181", - "namespace": "griffin/infocache", - "lock.path": "lock", - "mode": "persist", - "init.clear": true, - "close.clear": false - } - } - ], - "mail":[ - { - "host":"smtp.163.com",//mail host(163) - "mail":"[email protected]",//mail - "user":"[email protected]",//mail user - "password":"xxx"//mail password - } - ], - "sms":[ - { - "host":"xxx",//meassage url - "id":"xx", - "key":"xxx", - "UUID":"xxx" - } - ], - "cleaner": { - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/Application.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala index 43781f2..25dc34e 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala @@ -18,54 +18,51 @@ under the License. */ package org.apache.griffin.measure -import org.apache.griffin.measure.config.params._ -import org.apache.griffin.measure.config.params.env._ -import org.apache.griffin.measure.config.params.user._ -import org.apache.griffin.measure.config.reader._ -import org.apache.griffin.measure.config.validator._ -import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.persist.PersistThreadPool -import org.apache.griffin.measure.process._ +import org.apache.griffin.measure.configuration.enums._ +import org.apache.griffin.measure.configuration.json.ParamReaderFactory +import org.apache.griffin.measure.configuration.params.{AllParam, DQParam, EnvParam, Param} +import org.apache.griffin.measure.configuration.validator.ParamValidator +import org.apache.griffin.measure.context.writer.PersistThreadPool +import org.apache.griffin.measure.launch.DQApp +import org.apache.griffin.measure.launch.batch.BatchDQApp +import org.apache.griffin.measure.launch.streaming.StreamingDQApp import scala.util.{Failure, Success, Try} +/** + * application entrance + */ object Application extends Loggable { def main(args: Array[String]): Unit = { info(args.toString) if (args.length < 2) { - error("Usage: class <env-param> <user-param> [List of String split by comma: raw | local | hdfs(default)]") + error("Usage: class <env-param> <dq-param>") sys.exit(-1) } val envParamFile = args(0) - val userParamFile = args(1) - val (envFsType, userFsType) = if (args.length > 2) { - val fsTypes = args(2).trim.split(",") - if (fsTypes.length == 1) (fsTypes(0).trim, fsTypes(0).trim) - else if (fsTypes.length >= 2) (fsTypes(0).trim, fsTypes(1).trim) - else ("hdfs", "hdfs") - } else ("hdfs", "hdfs") + val dqParamFile = args(1) info(envParamFile) - info(userParamFile) + info(dqParamFile) // read param files - val envParam = readParamFile[EnvParam](envParamFile, envFsType) match { + val envParam = readParamFile[EnvParam](envParamFile) match { case Success(p) => p case Failure(ex) => { error(ex.getMessage) sys.exit(-2) } } - val userParam = readParamFile[UserParam](userParamFile, userFsType) match { + val dqParam = readParamFile[DQParam](dqParamFile) match { case Success(p) => p case Failure(ex) => { error(ex.getMessage) sys.exit(-2) } } - val allParam: AllParam = AllParam(envParam, userParam) + val allParam: AllParam = AllParam(envParam, dqParam) // validate param files ParamValidator.validate(allParam) match { @@ -78,18 +75,19 @@ object Application extends Loggable { } } - // choose algorithm -// val dqType = allParam.userParam.dqType - val procType = ProcessType(allParam.userParam.procType) - val proc: DqProcess = procType match { - case BatchProcessType => BatchDqProcess(allParam) - case StreamingProcessType => StreamingDqProcess(allParam) + // choose process + val procType = ProcessType(allParam.dqParam.procType) + val proc: DQApp = procType match { + case BatchProcessType => BatchDQApp(allParam) + case StreamingProcessType => StreamingDQApp(allParam) case _ => { error(s"${procType} is unsupported process type!") sys.exit(-4) } } + startup + // process init proc.init match { case Success(_) => { @@ -110,7 +108,7 @@ object Application extends Loggable { case Failure(ex) => { error(s"process run error: ${ex.getMessage}") - if (proc.retriable) { + if (proc.retryable) { throw ex } else { shutdown @@ -120,7 +118,7 @@ object Application extends Loggable { } // process end - proc.end match { + proc.close match { case Success(_) => { info("process end success") } @@ -132,45 +130,17 @@ object Application extends Loggable { } shutdown - -// val algo: Algo = (dqType, procType) match { -// case (MeasureType.accuracy(), ProcessType.batch()) => BatchAccuracyAlgo(allParam) -// case (MeasureType.profile(), ProcessType.batch()) => BatchProfileAlgo(allParam) -// case (MeasureType.accuracy(), ProcessType.streaming()) => StreamingAccuracyAlgo(allParam) -//// case (MeasureType.profile(), ProcessType.streaming()) => StreamingProfileAlgo(allParam) -// case _ => { -// error(s"${dqType} with ${procType} is unsupported dq type!") -// sys.exit(-4) -// } -// } - - // algorithm run -// algo.run match { -// case Failure(ex) => { -// error(s"app error: ${ex.getMessage}") -// -// procType match { -// case ProcessType.streaming() => { -// // streaming need to attempt more times by spark streaming itself -// throw ex -// } -// case _ => { -// shutdown -// sys.exit(-5) -// } -// } -// } -// case _ => { -// info("app finished and success") -// } -// } } - private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = { - val paramReader = ParamReaderFactory.getParamReader(file, fsType) + private def readParamFile[T <: Param](file: String)(implicit m : Manifest[T]): Try[T] = { + val paramReader = ParamReaderFactory.getParamReader(file) paramReader.readConfig[T] } + private def startup(): Unit = { + PersistThreadPool.start + } + private def shutdown(): Unit = { PersistThreadPool.shutdown } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/Loggable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/Loggable.scala b/measure/src/main/scala/org/apache/griffin/measure/Loggable.scala new file mode 100644 index 0000000..961fd25 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/Loggable.scala @@ -0,0 +1,43 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure + +import org.slf4j.LoggerFactory + +trait Loggable { + + @transient private lazy val logger = LoggerFactory.getLogger(getClass) + + protected def info(msg: String): Unit = { + logger.info(msg) + } + + protected def debug(msg: String): Unit = { + logger.debug(msg) + } + + protected def warn(msg: String): Unit = { + logger.warn(msg) + } + + protected def error(msg: String): Unit = { + logger.error(msg) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCache.scala deleted file mode 100644 index 8e4b25d..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCache.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.cache.info - -import org.apache.griffin.measure.cache.lock.CacheLock -import org.apache.griffin.measure.log.Loggable - -trait InfoCache extends Loggable with Serializable { - - def init(): Unit - def available(): Boolean - def close(): Unit - - def cacheInfo(info: Map[String, String]): Boolean - def readInfo(keys: Iterable[String]): Map[String, String] - def deleteInfo(keys: Iterable[String]): Unit - def clearInfo(): Unit - - def listKeys(path: String): List[String] - - def genLock(s: String): CacheLock - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheFactory.scala deleted file mode 100644 index 3c9d70a..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheFactory.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.cache.info - -import org.apache.griffin.measure.config.params.env.InfoCacheParam - -import scala.util.{Success, Try} - -case class InfoCacheFactory(infoCacheParams: Iterable[InfoCacheParam], metricName: String) extends Serializable { - - val ZK_REGEX = """^(?i)zk|zookeeper$""".r - - def getInfoCache(infoCacheParam: InfoCacheParam): Option[InfoCache] = { - val config = infoCacheParam.config - val infoCacheTry = infoCacheParam.persistType match { - case ZK_REGEX() => Try(ZKInfoCache(config, metricName)) - case _ => throw new Exception("not supported info cache type") - } - infoCacheTry match { - case Success(infoCache) => Some(infoCache) - case _ => None - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheInstance.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheInstance.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheInstance.scala deleted file mode 100644 index b116fca..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheInstance.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.cache.info - -import org.apache.griffin.measure.cache.lock.{CacheLock, MultiCacheLock} -import org.apache.griffin.measure.config.params.env.InfoCacheParam - -object InfoCacheInstance extends InfoCache { - var infoCaches: List[InfoCache] = Nil - - def initInstance(infoCacheParams: Iterable[InfoCacheParam], metricName: String) = { - val fac = InfoCacheFactory(infoCacheParams, metricName) - infoCaches = infoCacheParams.flatMap(param => fac.getInfoCache(param)).toList - } - - def init(): Unit = infoCaches.foreach(_.init) - def available(): Boolean = infoCaches.foldLeft(false)(_ || _.available) - def close(): Unit = infoCaches.foreach(_.close) - - def cacheInfo(info: Map[String, String]): Boolean = { - infoCaches.foldLeft(false) { (res, infoCache) => res || infoCache.cacheInfo(info) } - } - def readInfo(keys: Iterable[String]): Map[String, String] = { - val maps = infoCaches.map(_.readInfo(keys)).reverse - maps.fold(Map[String, String]())(_ ++ _) - } - def deleteInfo(keys: Iterable[String]): Unit = infoCaches.foreach(_.deleteInfo(keys)) - def clearInfo(): Unit = infoCaches.foreach(_.clearInfo) - - def listKeys(path: String): List[String] = { - infoCaches.foldLeft(Nil: List[String]) { (res, infoCache) => - if (res.nonEmpty) res else infoCache.listKeys(path) - } - } - - def genLock(s: String): CacheLock = MultiCacheLock(infoCaches.map(_.genLock(s))) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala deleted file mode 100644 index c976453..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala +++ /dev/null @@ -1,127 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.cache.info - -import org.apache.griffin.measure.log.Loggable - -object TimeInfoCache extends Loggable with Serializable { - - private val CacheTime = "cache.time" - private val LastProcTime = "last.proc.time" - private val ReadyTime = "ready.time" - private val CleanTime = "clean.time" - private val OldCacheIndex = "old.cache.index" - - def cacheTime(path: String): String = s"${path}/${CacheTime}" - def lastProcTime(path: String): String = s"${path}/${LastProcTime}" - def readyTime(path: String): String = s"${path}/${ReadyTime}" - def cleanTime(path: String): String = s"${path}/${CleanTime}" - def oldCacheIndex(path: String): String = s"${path}/${OldCacheIndex}" - - val infoPath = "info" - - val finalCacheInfoPath = "info.final" - val finalReadyTime = s"${finalCacheInfoPath}/${ReadyTime}" - val finalLastProcTime = s"${finalCacheInfoPath}/${LastProcTime}" - val finalCleanTime = s"${finalCacheInfoPath}/${CleanTime}" - - def startTimeInfoCache(): Unit = { - genFinalReadyTime - } - - def getTimeRange(): (Long, Long) = { - readTimeRange - } - - def getCleanTime(): Long = { - readCleanTime - } - - def endTimeInfoCache: Unit = { - genFinalLastProcTime - genFinalCleanTime - } - - private def genFinalReadyTime(): Unit = { - val subPath = InfoCacheInstance.listKeys(infoPath) - val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" } - val result = InfoCacheInstance.readInfo(keys) - val times = keys.flatMap { k => - getLongOpt(result, k) - } - if (times.nonEmpty) { - val time = times.min - val map = Map[String, String]((finalReadyTime -> time.toString)) - InfoCacheInstance.cacheInfo(map) - } - } - - private def genFinalLastProcTime(): Unit = { - val subPath = InfoCacheInstance.listKeys(infoPath) - val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" } - val result = InfoCacheInstance.readInfo(keys) - val times = keys.flatMap { k => - getLongOpt(result, k) - } - if (times.nonEmpty) { - val time = times.min - val map = Map[String, String]((finalLastProcTime -> time.toString)) - InfoCacheInstance.cacheInfo(map) - } - } - - private def genFinalCleanTime(): Unit = { - val subPath = InfoCacheInstance.listKeys(infoPath) - val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" } - val result = InfoCacheInstance.readInfo(keys) - val times = keys.flatMap { k => - getLongOpt(result, k) - } - if (times.nonEmpty) { - val time = times.min - val map = Map[String, String]((finalCleanTime -> time.toString)) - InfoCacheInstance.cacheInfo(map) - } - } - - private def readTimeRange(): (Long, Long) = { - val map = InfoCacheInstance.readInfo(List(finalLastProcTime, finalReadyTime)) - val lastProcTime = getLong(map, finalLastProcTime) - val curReadyTime = getLong(map, finalReadyTime) - (lastProcTime, curReadyTime) - } - - private def readCleanTime(): Long = { - val map = InfoCacheInstance.readInfo(List(finalCleanTime)) - val cleanTime = getLong(map, finalCleanTime) - cleanTime - } - - private def getLongOpt(map: Map[String, String], key: String): Option[Long] = { - try { - map.get(key).map(_.toLong) - } catch { - case e: Throwable => None - } - } - private def getLong(map: Map[String, String], key: String) = { - getLongOpt(map, key).getOrElse(-1L) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala deleted file mode 100644 index 3789a05..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala +++ /dev/null @@ -1,212 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.cache.info - -import org.apache.curator.framework.imps.CuratorFrameworkState -import org.apache.curator.framework.recipes.locks.InterProcessMutex -import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} -import org.apache.curator.retry.ExponentialBackoffRetry -import org.apache.curator.utils.ZKPaths -import org.apache.griffin.measure.cache.lock.ZKCacheLock -import org.apache.zookeeper.CreateMode - -import scala.collection.JavaConverters._ - -case class ZKInfoCache(config: Map[String, Any], metricName: String) extends InfoCache { - - val Hosts = "hosts" - val Namespace = "namespace" - val Mode = "mode" - val InitClear = "init.clear" - val CloseClear = "close.clear" - val LockPath = "lock.path" - - val PersistRegex = """^(?i)persist$""".r - val EphemeralRegex = """^(?i)ephemeral$""".r - - final val separator = ZKPaths.PATH_SEPARATOR - - val hosts = config.getOrElse(Hosts, "").toString - val namespace = config.getOrElse(Namespace, "").toString - val mode: CreateMode = config.get(Mode) match { - case Some(s: String) => s match { - case PersistRegex() => CreateMode.PERSISTENT - case EphemeralRegex() => CreateMode.EPHEMERAL - case _ => CreateMode.PERSISTENT - } - case _ => CreateMode.PERSISTENT - } - val initClear = config.get(InitClear) match { - case Some(b: Boolean) => b - case _ => true - } - val closeClear = config.get(CloseClear) match { - case Some(b: Boolean) => b - case _ => false - } - val lockPath = config.getOrElse(LockPath, "lock").toString - - private val cacheNamespace: String = if (namespace.isEmpty) metricName else namespace + separator + metricName - private val builder = CuratorFrameworkFactory.builder() - .connectString(hosts) - .retryPolicy(new ExponentialBackoffRetry(1000, 3)) - .namespace(cacheNamespace) - private val client: CuratorFramework = builder.build - - def init(): Unit = { - client.start() - info("start zk info cache") - client.usingNamespace(cacheNamespace) - info(s"init with namespace: ${cacheNamespace}") - deleteInfo(lockPath :: Nil) - if (initClear) { - clearInfo - } - } - - def available(): Boolean = { - client.getState match { - case CuratorFrameworkState.STARTED => true - case _ => false - } - } - - def close(): Unit = { - if (closeClear) { - clearInfo - } - info("close zk info cache") - client.close() - } - - def cacheInfo(info: Map[String, String]): Boolean = { - info.foldLeft(true) { (rs, pair) => - val (k, v) = pair - createOrUpdate(path(k), v) && rs - } - } - - def readInfo(keys: Iterable[String]): Map[String, String] = { - keys.flatMap { key => - read(path(key)) match { - case Some(v) => Some((key, v)) - case _ => None - } - }.toMap - } - - def deleteInfo(keys: Iterable[String]): Unit = { - keys.foreach { key => delete(path(key)) } - } - - def clearInfo(): Unit = { -// delete("/") - deleteInfo(TimeInfoCache.finalCacheInfoPath :: Nil) - deleteInfo(TimeInfoCache.infoPath :: Nil) - println("clear info") - } - - def listKeys(p: String): List[String] = { - children(path(p)) - } - - def genLock(s: String): ZKCacheLock = { - val lpt = if (s.isEmpty) path(lockPath) else path(lockPath) + separator + s - ZKCacheLock(new InterProcessMutex(client, lpt)) - } - - private def path(k: String): String = { - if (k.startsWith(separator)) k else separator + k - } - - private def children(path: String): List[String] = { - try { - client.getChildren().forPath(path).asScala.toList - } catch { - case e: Throwable => { - warn(s"list ${path} warn: ${e.getMessage}") - Nil - } - } - } - - private def createOrUpdate(path: String, content: String): Boolean = { - if (checkExists(path)) { - update(path, content) - } else { - create(path, content) - } - } - - private def create(path: String, content: String): Boolean = { - try { - client.create().creatingParentsIfNeeded().withMode(mode) - .forPath(path, content.getBytes("utf-8")) - true - } catch { - case e: Throwable => { - error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}") - false - } - } - } - - private def update(path: String, content: String): Boolean = { - try { - client.setData().forPath(path, content.getBytes("utf-8")) - true - } catch { - case e: Throwable => { - error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}") - false - } - } - } - - private def read(path: String): Option[String] = { - try { - Some(new String(client.getData().forPath(path), "utf-8")) - } catch { - case e: Throwable => { - warn(s"read ${path} warn: ${e.getMessage}") - None - } - } - } - - private def delete(path: String): Unit = { - try { - client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path) - } catch { - case e: Throwable => error(s"delete ${path} error: ${e.getMessage}") - } - } - - private def checkExists(path: String): Boolean = { - try { - client.checkExists().forPath(path) != null - } catch { - case e: Throwable => { - warn(s"check exists ${path} warn: ${e.getMessage}") - false - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/lock/CacheLock.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/lock/CacheLock.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/lock/CacheLock.scala deleted file mode 100644 index 24f142c..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/cache/lock/CacheLock.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.cache.lock - -import java.util.concurrent.TimeUnit - -import org.apache.griffin.measure.log.Loggable - -trait CacheLock extends Loggable with Serializable { - - def lock(outtime: Long, unit: TimeUnit): Boolean - - def unlock(): Unit - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/lock/MultiCacheLock.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/lock/MultiCacheLock.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/lock/MultiCacheLock.scala deleted file mode 100644 index 7b835f4..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/cache/lock/MultiCacheLock.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.cache.lock - -import java.util.concurrent.TimeUnit - -case class MultiCacheLock(cacheLocks: List[CacheLock]) extends CacheLock { - - def lock(outtime: Long, unit: TimeUnit): Boolean = { - cacheLocks.headOption match { - case Some(cl) => cl.lock(outtime, unit) - case None => true - } - } - - def unlock(): Unit = { - cacheLocks.headOption match { - case Some(cl) => cl.unlock - case None => {} - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/lock/ZKCacheLock.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/lock/ZKCacheLock.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/lock/ZKCacheLock.scala deleted file mode 100644 index 77ee83b..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/cache/lock/ZKCacheLock.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.cache.lock - -import java.util.concurrent.TimeUnit - -import org.apache.curator.framework.recipes.locks.InterProcessMutex - -case class ZKCacheLock(@transient mutex: InterProcessMutex) extends CacheLock { - - def lock(outtime: Long, unit: TimeUnit): Boolean = { - try { - if (outtime >= 0) { - mutex.acquire(outtime, unit) - } else { - mutex.acquire(-1, null) - } - } catch { - case e: Throwable => { - error(s"lock error: ${e.getMessage}") - false - } - } - - } - - def unlock(): Unit = { - try { - if (mutex.isAcquiredInThisProcess) mutex.release - } catch { - case e: Throwable => { - error(s"unlock error: ${e.getMessage}") - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResult.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResult.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResult.scala deleted file mode 100644 index 92787be..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResult.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.cache.result - -import org.apache.griffin.measure.result.Result - -case class CacheResult(timeGroup: Long, updateTime: Long, result: Result) { - - def olderThan(ut: Long): Boolean = { - updateTime < ut - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala deleted file mode 100644 index 0511c04..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.cache.result - -import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.result._ - -import scala.collection.mutable.{Map => MutableMap} - -object CacheResultProcesser extends Loggable { - - private val cacheGroup: MutableMap[Long, CacheResult] = MutableMap() - - def genUpdateCacheResult(timeGroup: Long, updateTime: Long, result: Result): Option[CacheResult] = { - cacheGroup.get(timeGroup) match { - case Some(cr) => { - if (cr.olderThan(updateTime)) { - val existResult = cr.result - val newResult = existResult.update(result.asInstanceOf[existResult.T]) - if (existResult.differsFrom(newResult)) { - Some(CacheResult(timeGroup, updateTime, newResult)) - } else None - } else None - } - case _ => { - Some(CacheResult(timeGroup, updateTime, result)) - } - } - } - - def update(cr: CacheResult): Unit = { - val t = cr.timeGroup - cacheGroup.get(t) match { - case Some(c) => { - if (c.olderThan(cr.updateTime)) cacheGroup += (t -> cr) - } - case _ => cacheGroup += (t -> cr) - } - } - - def getCacheResult(timeGroup: Long): Option[CacheResult] = { - cacheGroup.get(timeGroup) - } - - def refresh(overtime: Long): Unit = { - val curCacheGroup = cacheGroup.toMap - val deadCache = curCacheGroup.filter { pr => - val (_, cr) = pr - cr.timeGroup < overtime || cr.result.eventual() - } - info(s"=== dead cache group count: ${deadCache.size} ===") - deadCache.keySet.foreach(cacheGroup -= _) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TempName.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TempName.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TempName.scala deleted file mode 100644 index 7a570ec..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TempName.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.cache.tmst - -import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.rule.plan.TimeInfo - -object TempName extends Loggable { - - def tmstName(name: String, ms: Long) = { - s"${name}_${ms}" - } - - //-- temp df name -- -// private val tmstNameRegex = """^(.*)\((\d*)\)\[(\d*)\]$""".r -// private val tmstNameRegex = """^(.*)_(\d*)_(\d*)$""".r -// def tmstName(name: String, timeInfo: TimeInfo) = { -// val calcTime = timeInfo.calcTime -// val tmst = timeInfo.tmst -// s"${name}_${calcTime}_${tmst}" -// } -// def extractTmstName(tmstName: String): (String, Option[Long], Option[Long]) = { -// tmstName match { -// case tmstNameRegex(name, calcTime, tmst) => { -// try { (name, Some(calcTime.toLong), Some(tmst.toLong)) } catch { case e: Throwable => (tmstName, None, None) } -// } -// case _ => (tmstName, None, None) -// } -// } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TmstCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TmstCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TmstCache.scala deleted file mode 100644 index f031449..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TmstCache.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.cache.tmst - -import java.util.concurrent.atomic.AtomicLong - -import org.apache.griffin.measure.log.Loggable - -import scala.collection.mutable.{SortedSet => MutableSortedSet} - - -case class TmstCache() extends Loggable { - - private val tmstGroup: MutableSortedSet[Long] = MutableSortedSet.empty[Long] - - //-- insert tmst into tmst group -- - def insert(tmst: Long) = tmstGroup += tmst - def insert(tmsts: Iterable[Long]) = tmstGroup ++= tmsts - - //-- remove tmst from tmst group -- - def remove(tmst: Long) = tmstGroup -= tmst - def remove(tmsts: Iterable[Long]) = tmstGroup --= tmsts - - //-- get subset of tmst group -- - def range(from: Long, until: Long) = tmstGroup.range(from, until).toSet - def until(until: Long) = tmstGroup.until(until).toSet - def from(from: Long) = tmstGroup.from(from).toSet - def all = tmstGroup.toSet - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/AllParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/AllParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/AllParam.scala deleted file mode 100644 index 8990564..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/params/AllParam.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.config.params - -import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} -import com.fasterxml.jackson.annotation.JsonInclude.Include -import org.apache.griffin.measure.config.params.env._ -import org.apache.griffin.measure.config.params.user._ - -// simply composite of env and user params, for convenient usage -@JsonInclude(Include.NON_NULL) -case class AllParam( @JsonProperty("env") envParam: EnvParam, - @JsonProperty("user") userParam: UserParam - ) extends Param { - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/Param.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/Param.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/Param.scala deleted file mode 100644 index d3484a1..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/config/params/Param.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.config.params - -trait Param extends Serializable { - - def validate(): Boolean = true - -}
