refactored the 'measure' portion to have the clearer structure

Author: Grant Guo <[email protected]>

Closes #287 from grant-xuexu/feature/refactoring.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/1d7acd57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/1d7acd57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/1d7acd57

Branch: refs/heads/master
Commit: 1d7acd57a9dbbaec4470d948f7d395481b84e19d
Parents: 4ca4cdb
Author: Grant Guo <[email protected]>
Authored: Tue May 29 12:22:31 2018 +0800
Committer: Lionel Liu <[email protected]>
Committed: Tue May 29 12:22:31 2018 +0800

----------------------------------------------------------------------
 measure/pom.xml                                 | 408 +++++++++----------
 measure/src/main/resources/config-old.json      |  31 --
 measure/src/main/resources/config-sql.json      |  54 ---
 .../src/main/resources/config-streaming.json    |  61 ---
 measure/src/main/resources/config.json          |  59 ---
 measure/src/main/resources/env.json             |  60 ---
 .../apache/griffin/measure/Application.scala    |  94 ++---
 .../org/apache/griffin/measure/Loggable.scala   |  43 ++
 .../griffin/measure/cache/info/InfoCache.scala  |  39 --
 .../measure/cache/info/InfoCacheFactory.scala   |  41 --
 .../measure/cache/info/InfoCacheInstance.scala  |  53 ---
 .../measure/cache/info/TimeInfoCache.scala      | 127 ------
 .../measure/cache/info/ZKInfoCache.scala        | 212 ----------
 .../griffin/measure/cache/lock/CacheLock.scala  |  31 --
 .../measure/cache/lock/MultiCacheLock.scala     |  39 --
 .../measure/cache/lock/ZKCacheLock.scala        |  53 ---
 .../measure/cache/result/CacheResult.scala      |  29 --
 .../cache/result/CacheResultProcesser.scala     |  71 ----
 .../griffin/measure/cache/tmst/TempName.scala   |  47 ---
 .../griffin/measure/cache/tmst/TmstCache.scala  |  46 ---
 .../measure/config/params/AllParam.scala        |  32 --
 .../griffin/measure/config/params/Param.scala   |  25 --
 .../config/params/env/CleanerParam.scala        |  29 --
 .../measure/config/params/env/EmailParam.scala  |  36 --
 .../measure/config/params/env/EnvParam.scala    |  34 --
 .../config/params/env/InfoCacheParam.scala      |  30 --
 .../config/params/env/PersistParam.scala        |  30 --
 .../measure/config/params/env/SMSParam.scala    |  36 --
 .../measure/config/params/env/SparkParam.scala  |  36 --
 .../config/params/user/DataConnectorParam.scala |  33 --
 .../config/params/user/DataSourceParam.scala    |  35 --
 .../config/params/user/EvaluateRuleParam.scala  |  30 --
 .../measure/config/params/user/UserParam.scala  |  64 ---
 .../measure/config/reader/ParamFileReader.scala |  38 --
 .../config/reader/ParamHdfsFileReader.scala     |  38 --
 .../config/reader/ParamRawStringReader.scala    |  35 --
 .../measure/config/reader/ParamReader.scala     |  30 --
 .../config/reader/ParamReaderFactory.scala      |  40 --
 .../config/validator/ParamValidator.scala       |  32 --
 .../measure/configuration/enums/DqType.scala    | 112 +++++
 .../measure/configuration/enums/DslType.scala   |  64 +++
 .../configuration/enums/NormalizeType.scala     |  86 ++++
 .../configuration/enums/ProcessType.scala       |  56 +++
 .../measure/configuration/enums/WriteMode.scala |  43 ++
 .../configuration/json/ParamFileReader.scala    |  43 ++
 .../configuration/json/ParamJsonReader.scala    |  39 ++
 .../configuration/json/ParamReader.scala        |  35 ++
 .../configuration/json/ParamReaderFactory.scala |  48 +++
 .../measure/configuration/params/AllParam.scala |  36 ++
 .../measure/configuration/params/DQParam.scala  | 183 +++++++++
 .../measure/configuration/params/EnvParam.scala |  97 +++++
 .../measure/configuration/params/Param.scala    |  29 ++
 .../validator/ParamValidator.scala              |  38 ++
 .../griffin/measure/context/ContextId.scala     |  28 ++
 .../griffin/measure/context/DQContext.scala     |  94 +++++
 .../measure/context/DataFrameCache.scala        |  72 ++++
 .../griffin/measure/context/MetricWrapper.scala |  49 +++
 .../griffin/measure/context/TableRegister.scala |  81 ++++
 .../griffin/measure/context/TimeRange.scala     |  48 +++
 .../measure/context/datasource/DataSource.scala |  97 +++++
 .../context/datasource/DataSourceFactory.scala  |  66 +++
 .../datasource/cache/DataSourceCache.scala      | 366 +++++++++++++++++
 .../cache/DataSourceCacheFactory.scala          |  68 ++++
 .../datasource/cache/JsonDataSourceCache.scala  |  40 ++
 .../datasource/cache/OrcDataSourceCache.scala   |  40 ++
 .../cache/ParquetDataSourceCache.scala          |  42 ++
 .../context/datasource/cache/WithFanIn.scala    |  69 ++++
 .../datasource/connector/DataConnector.scala    | 112 +++++
 .../connector/DataConnectorFactory.scala        | 125 ++++++
 .../batch/AvroBatchDataConnector.scala          |  71 ++++
 .../connector/batch/BatchDataConnector.scala    |  27 ++
 .../batch/HiveBatchDataConnector.scala          |  86 ++++
 .../batch/TextDirBatchDataConnector.scala       | 106 +++++
 .../streaming/KafkaStreamingDataConnector.scala |  85 ++++
 .../KafkaStreamingStringDataConnector.scala     |  71 ++++
 .../streaming/StreamingDataConnector.scala      |  46 +++
 .../datasource/info/DataSourceCacheable.scala   |  88 ++++
 .../context/datasource/info/TmstCache.scala     |  47 +++
 .../context/streaming/info/InfoCache.scala      |  39 ++
 .../streaming/info/InfoCacheFactory.scala       |  41 ++
 .../streaming/info/InfoCacheInstance.scala      |  53 +++
 .../context/streaming/info/TimeInfoCache.scala  | 127 ++++++
 .../context/streaming/info/ZKInfoCache.scala    | 217 ++++++++++
 .../context/streaming/lock/CacheLock.scala      |  34 ++
 .../context/streaming/lock/MultiCacheLock.scala |  39 ++
 .../context/streaming/lock/ZKCacheLock.scala    |  53 +++
 .../streaming/metric/AccuracyMetric.scala       |  54 +++
 .../context/streaming/metric/CacheResults.scala |  78 ++++
 .../context/streaming/metric/Metric.scala       |  35 ++
 .../measure/context/writer/HdfsPersist.scala    | 190 +++++++++
 .../measure/context/writer/HttpPersist.scala    |  76 ++++
 .../measure/context/writer/LoggerPersist.scala  |  82 ++++
 .../measure/context/writer/MongoPersist.scala   | 118 ++++++
 .../measure/context/writer/MultiPersists.scala  |  83 ++++
 .../measure/context/writer/Persist.scala        |  45 ++
 .../measure/context/writer/PersistFactory.scala |  55 +++
 .../context/writer/PersistThreadPool.scala      |  81 ++++
 .../measure/data/connector/DataConnector.scala  | 147 -------
 .../data/connector/DataConnectorFactory.scala   | 147 -------
 .../batch/AvroBatchDataConnector.scala          | 115 ------
 .../connector/batch/BatchDataConnector.scala    |  35 --
 .../batch/HiveBatchDataConnector.scala          | 165 --------
 .../batch/TextDirBatchDataConnector.scala       | 139 -------
 .../streaming/KafkaStreamingDataConnector.scala |  87 ----
 .../KafkaStreamingStringDataConnector.scala     |  65 ---
 .../streaming/StreamingDataConnector.scala      |  45 --
 .../measure/data/source/DataSource.scala        | 140 -------
 .../measure/data/source/DataSourceFactory.scala |  80 ----
 .../data/source/cache/DataCacheable.scala       |  84 ----
 .../data/source/cache/DataSourceCache.scala     | 380 -----------------
 .../source/cache/DataSourceCacheFactory.scala   |  58 ---
 .../data/source/cache/JsonDataSourceCache.scala |  40 --
 .../data/source/cache/OrcDataSourceCache.scala  |  40 --
 .../source/cache/ParquetDataSourceCache.scala   |  40 --
 .../measure/data/source/cache/WithFanIn.scala   |  57 ---
 .../org/apache/griffin/measure/job/DQJob.scala  |  32 ++
 .../measure/job/builder/DQJobBuilder.scala      |  68 ++++
 .../apache/griffin/measure/launch/DQApp.scala   |  52 +++
 .../measure/launch/batch/BatchDQApp.scala       | 107 +++++
 .../launch/streaming/StreamingDQApp.scala       | 177 ++++++++
 .../launch/streaming/StreamingDQApp2.scala      | 104 +++++
 .../apache/griffin/measure/log/Loggable.scala   |  43 --
 .../griffin/measure/persist/HdfsPersist.scala   | 344 ----------------
 .../griffin/measure/persist/HttpPersist.scala   | 115 ------
 .../griffin/measure/persist/LoggerPersist.scala | 184 ---------
 .../griffin/measure/persist/MongoPersist.scala  | 119 ------
 .../measure/persist/MongoThreadPool.scala       |  73 ----
 .../griffin/measure/persist/MultiPersists.scala |  91 -----
 .../griffin/measure/persist/Persist.scala       |  57 ---
 .../measure/persist/PersistFactory.scala        |  55 ---
 .../measure/persist/PersistThreadPool.scala     |  62 ---
 .../measure/process/BatchDqProcess.scala        | 177 --------
 .../griffin/measure/process/DqProcess.scala     |  45 --
 .../griffin/measure/process/ExportMode.scala    |  34 --
 .../griffin/measure/process/ProcessType.scala   |  47 ---
 .../measure/process/StreamingDqProcess.scala    | 180 --------
 .../measure/process/StreamingDqThread.scala     | 146 -------
 .../griffin/measure/process/TimingProcess.scala |  46 ---
 .../process/engine/DataFrameOprEngine.scala     | 179 --------
 .../measure/process/engine/DqEngine.scala       |  51 ---
 .../process/engine/DqEngineFactory.scala        |  47 ---
 .../measure/process/engine/DqEngines.scala      | 242 -----------
 .../measure/process/engine/SparkDqEngine.scala  | 183 ---------
 .../process/engine/SparkRowFormatter.scala      |  62 ---
 .../measure/process/engine/SparkSqlEngine.scala |  76 ----
 .../measure/process/temp/DataFrameCaches.scala  | 133 ------
 .../measure/process/temp/TableRegisters.scala   | 153 -------
 .../measure/process/temp/TableRegs.scala        |  81 ----
 .../measure/process/temp/TimeRange.scala        |  48 ---
 .../griffin/measure/result/AccuracyResult.scala |  50 ---
 .../griffin/measure/result/DataInfo.scala       |  50 ---
 .../griffin/measure/result/ProfileResult.scala  |  48 ---
 .../apache/griffin/measure/result/Result.scala  |  36 --
 .../measure/rule/adaptor/AdaptPhase.scala       |  25 --
 .../rule/adaptor/DataFrameOprAdaptor.scala      |  62 ---
 .../measure/rule/adaptor/GlobalKeys.scala       |  27 --
 .../rule/adaptor/GriffinDslAdaptor.scala        |  73 ----
 .../measure/rule/adaptor/InternalColumns.scala  |  35 --
 .../measure/rule/adaptor/RuleAdaptor.scala      | 100 -----
 .../measure/rule/adaptor/RuleAdaptorGroup.scala | 311 --------------
 .../measure/rule/adaptor/SparkSqlAdaptor.scala  |  55 ---
 .../griffin/measure/rule/dsl/CollectType.scala  |  57 ---
 .../griffin/measure/rule/dsl/DqType.scala       |  75 ----
 .../griffin/measure/rule/dsl/DslType.scala      |  53 ---
 .../griffin/measure/rule/dsl/PersistType.scala  |  60 ---
 .../rule/dsl/analyzer/AccuracyAnalyzer.scala    |  41 --
 .../rule/dsl/analyzer/BasicAnalyzer.scala       |  53 ---
 .../dsl/analyzer/CompletenessAnalyzer.scala     |  46 ---
 .../dsl/analyzer/DistinctnessAnalyzer.scala     |  47 ---
 .../rule/dsl/analyzer/ProfilingAnalyzer.scala   |  42 --
 .../rule/dsl/analyzer/TimelinessAnalyzer.scala  |  65 ---
 .../rule/dsl/analyzer/UniquenessAnalyzer.scala  |  46 ---
 .../measure/rule/dsl/expr/AliasableExpr.scala   |  25 --
 .../rule/dsl/expr/ClauseExpression.scala        | 270 ------------
 .../griffin/measure/rule/dsl/expr/Expr.scala    |  32 --
 .../griffin/measure/rule/dsl/expr/ExprTag.scala |  23 --
 .../rule/dsl/expr/ExtraConditionExpr.scala      |  27 --
 .../measure/rule/dsl/expr/FunctionExpr.scala    |  45 --
 .../measure/rule/dsl/expr/LiteralExpr.scala     |  72 ----
 .../measure/rule/dsl/expr/LogicalExpr.scala     | 204 ----------
 .../measure/rule/dsl/expr/MathExpr.scala        |  94 -----
 .../measure/rule/dsl/expr/SelectExpr.scala      | 132 ------
 .../measure/rule/dsl/expr/TreeNode.scala        |  45 --
 .../measure/rule/dsl/parser/BasicParser.scala   | 388 ------------------
 .../rule/dsl/parser/GriffinDslParser.scala      |  94 -----
 .../griffin/measure/rule/plan/DfOprStep.scala   |  32 --
 .../griffin/measure/rule/plan/DsUpdate.scala    |  24 --
 .../measure/rule/plan/MetricExport.scala        |  31 --
 .../measure/rule/plan/RecordExport.scala        |  31 --
 .../griffin/measure/rule/plan/RuleExport.scala  |  33 --
 .../griffin/measure/rule/plan/RulePlan.scala    |  59 ---
 .../griffin/measure/rule/plan/RuleStep.scala    |  40 --
 .../measure/rule/plan/SparkSqlStep.scala        |  32 --
 .../griffin/measure/rule/plan/TimeInfo.scala    |  37 --
 .../rule/preproc/PreProcRuleGenerator.scala     |  72 ----
 .../rule/trans/AccuracyRulePlanTrans.scala      | 200 ---------
 .../rule/trans/CompletenessRulePlanTrans.scala  | 145 -------
 .../rule/trans/DistinctnessRulePlanTrans.scala  | 348 ----------------
 .../measure/rule/trans/DsUpdateFactory.scala    |  37 --
 .../rule/trans/ProfilingRulePlanTrans.scala     | 100 -----
 .../measure/rule/trans/RuleExportFactory.scala  |  65 ---
 .../measure/rule/trans/RulePlanTrans.scala      |  60 ---
 .../rule/trans/TimelinessRulePlanTrans.scala    | 224 ----------
 .../rule/trans/UniquenessRulePlanTrans.scala    | 200 ---------
 .../griffin/measure/rule/udf/GriffinUdafs.scala |  29 --
 .../griffin/measure/rule/udf/GriffinUdfs.scala  |  43 --
 .../griffin/measure/rule/udf/MeanUdaf.scala     |  58 ---
 .../apache/griffin/measure/step/DQStep.scala    |  32 ++
 .../apache/griffin/measure/step/SeqDQStep.scala |  44 ++
 .../builder/BatchDataSourceStepBuilder.scala    |  31 ++
 .../measure/step/builder/ConstantColumns.scala  |  38 ++
 .../measure/step/builder/DQStepBuilder.scala    |  81 ++++
 .../step/builder/DQStepNameGenerator.scala      |  34 ++
 .../builder/DataFrameOpsDQStepBuilder.scala     |  35 ++
 .../builder/DataSourceParamStepBuilder.scala    |  44 ++
 .../step/builder/GriffinDslDQStepBuilder.scala  |  60 +++
 .../step/builder/RuleParamStepBuilder.scala     |  61 +++
 .../step/builder/SparkSqlDQStepBuilder.scala    |  35 ++
 .../StreamingDataSourceStepBuilder.scala        |  31 ++
 .../step/builder/dsl/expr/AliasableExpr.scala   |  25 ++
 .../builder/dsl/expr/ClauseExpression.scala     | 270 ++++++++++++
 .../measure/step/builder/dsl/expr/Expr.scala    |  35 ++
 .../measure/step/builder/dsl/expr/ExprTag.scala |  23 ++
 .../builder/dsl/expr/ExtraConditionExpr.scala   |  27 ++
 .../step/builder/dsl/expr/FunctionExpr.scala    |  45 ++
 .../step/builder/dsl/expr/LiteralExpr.scala     |  72 ++++
 .../step/builder/dsl/expr/LogicalExpr.scala     | 204 ++++++++++
 .../step/builder/dsl/expr/MathExpr.scala        |  94 +++++
 .../step/builder/dsl/expr/SelectExpr.scala      | 132 ++++++
 .../step/builder/dsl/expr/TreeNode.scala        |  45 ++
 .../step/builder/dsl/parser/BasicParser.scala   | 391 ++++++++++++++++++
 .../builder/dsl/parser/GriffinDslParser.scala   |  98 +++++
 .../dsl/transform/AccuracyExpr2DQSteps.scala    | 200 +++++++++
 .../transform/CompletenessExpr2DQSteps.scala    | 157 +++++++
 .../transform/DistinctnessExpr2DQSteps.scala    | 358 ++++++++++++++++
 .../builder/dsl/transform/Expr2DQSteps.scala    |  59 +++
 .../dsl/transform/ProfilingExpr2DQSteps.scala   | 105 +++++
 .../dsl/transform/TimelinessExpr2DQSteps.scala  | 234 +++++++++++
 .../dsl/transform/UniquenessExpr2DQSteps.scala  | 204 ++++++++++
 .../transform/analyzer/AccuracyAnalyzer.scala   |  41 ++
 .../dsl/transform/analyzer/BasicAnalyzer.scala  |  55 +++
 .../analyzer/CompletenessAnalyzer.scala         |  46 +++
 .../analyzer/DistinctnessAnalyzer.scala         |  47 +++
 .../transform/analyzer/ProfilingAnalyzer.scala  |  42 ++
 .../transform/analyzer/TimelinessAnalyzer.scala |  65 +++
 .../transform/analyzer/UniquenessAnalyzer.scala |  46 +++
 .../preproc/PreProcRuleParamGenerator.scala     |  72 ++++
 .../measure/step/builder/udf/GriffinUDFs.scala  |  63 +++
 .../griffin/measure/step/read/ReadStep.scala    |  48 +++
 .../measure/step/read/UnionReadStep.scala       |  41 ++
 .../measure/step/transform/DataFrameOps.scala   | 134 ++++++
 .../transform/DataFrameOpsTransformStep.scala   |  52 +++
 .../step/transform/SparkSqlTransformStep.scala  |  47 +++
 .../measure/step/transform/TransformStep.scala  |  31 ++
 .../step/write/DsCacheUpdateWriteStep.scala     |  61 +++
 .../measure/step/write/MetricFlushStep.scala    |  45 ++
 .../measure/step/write/MetricWriteStep.scala    | 115 ++++++
 .../measure/step/write/RecordWriteStep.scala    | 151 +++++++
 .../measure/step/write/SparkRowFormatter.scala  |  65 +++
 .../griffin/measure/step/write/WriteStep.scala  |  31 ++
 .../apache/griffin/measure/util/MailUtil.java   |  74 ----
 .../apache/griffin/measure/util/Md5Util.java    |  56 ---
 .../griffin/measure/util/MessageUtil.java       | 178 --------
 .../griffin/measure/utils/DataFrameUtil.scala   |   2 +-
 .../apache/griffin/measure/utils/FSUtil.scala   |  82 ++++
 .../measure/utils/HdfsFileDumpUtil.scala        | 101 -----
 .../apache/griffin/measure/utils/HdfsUtil.scala |  71 ++--
 .../griffin/measure/utils/ParamUtil.scala       |  12 +
 .../apache/griffin/measure/utils/TimeUtil.scala |   2 +-
 .../resources/_accuracy-batch-griffindsl.json   |  56 ---
 .../resources/_accuracy-batch-sparksql.json     |  63 ---
 .../_accuracy-streaming-griffindsl.json         | 121 ------
 .../resources/_accuracy-streaming-sparksql.json | 142 -------
 .../_completeness-batch-griffindsl.json         |  36 --
 .../_completeness-streaming-griffindsl.json     |  65 ---
 .../_distinctness-batch-griffindsl.json         |  57 ---
 .../_distinctness-batch-griffindsl1.json        |  73 ----
 .../_distinctness-batch-griffindsl2.json        |  74 ----
 .../_distinctness-streaming-griffindsl.json     |  85 ----
 .../_profiling-batch-griffindsl-hive.json       |  48 ---
 .../resources/_profiling-batch-griffindsl.json  |  54 ---
 .../resources/_profiling-batch-sparksql.json    |  44 --
 .../_profiling-streaming-griffindsl.json        |  75 ----
 .../_profiling-streaming-sparksql.json          |  80 ----
 .../resources/_timeliness-batch-griffindsl.json |  49 ---
 .../resources/_timeliness-batch-sparksql.json   |  52 ---
 .../_timeliness-streaming-griffindsl.json       |  79 ----
 .../_timeliness-streaming-sparksql.json         |  82 ----
 .../resources/_uniqueness-batch-griffindsl.json |  58 ---
 .../_uniqueness-streaming-griffindsl.json       | 119 ------
 .../_uniqueness-streaming-sparksql.json         | 130 ------
 .../src/test/resources/config-griffindsl.json   |  56 +++
 measure/src/test/resources/config-profile.json  |  19 -
 .../resources/config-streaming-accuracy.json    | 121 ++++++
 .../src/test/resources/config-streaming.json    | 126 +++---
 .../src/test/resources/config-streaming1.json   |  65 ---
 .../src/test/resources/config-streaming2.json   |  65 ---
 .../src/test/resources/config-streaming3.json   |  65 ---
 .../resources/config-test-accuracy-new.json     |  56 ---
 .../resources/config-test-accuracy-new2.json    |  72 ----
 .../config-test-accuracy-streaming-multids.json | 142 -------
 .../config-test-accuracy-streaming-new.json     | 117 ------
 .../config-test-accuracy-streaming-new2.json    | 133 ------
 .../config-test-accuracy-streaming.json         | 117 ------
 .../test/resources/config-test-accuracy.json    |  58 ---
 .../test/resources/config-test-accuracy2.json   |  64 ---
 .../resources/config-test-profiling-new.json    |  80 ----
 .../resources/config-test-profiling-new2.json   |  36 --
 .../config-test-profiling-streaming-new.json    |  85 ----
 .../config-test-profiling-streaming-new2.json   |  72 ----
 .../config-test-profiling-streaming.json        |  91 -----
 .../test/resources/config-test-profiling.json   |  36 --
 .../test/resources/config-test-profiling1.json  |  60 ---
 .../test/resources/config-test-profiling2.json  |  35 --
 measure/src/test/resources/config-test.json     |  55 ---
 measure/src/test/resources/config-test1.json    |  96 -----
 measure/src/test/resources/config.json          |  82 +++-
 measure/src/test/resources/config1.json         |  29 --
 measure/src/test/resources/dupdata.avro         | Bin 304 -> 0 bytes
 measure/src/test/resources/empty.avro           | Bin 215 -> 0 bytes
 measure/src/test/resources/env-hdfs-test.json   |  45 --
 .../src/test/resources/env-streaming-mongo.json |  54 ---
 measure/src/test/resources/env-test.json        |  39 --
 measure/src/test/resources/env.json             |  20 +-
 measure/src/test/resources/env1.json            |  21 -
 .../resources/performance-test-accuracy.json    |  56 ---
 .../resources/performance-test-profiling.json   |  34 --
 measure/src/test/resources/test-data0.json      |  56 ---
 measure/src/test/resources/users_info_src.dat   |  50 ---
 .../src/test/resources/users_info_target.dat    |  50 ---
 .../griffin/measure/ApplicationTest.scala       |  25 ++
 .../reader/ParamRawStringReaderTest.scala       |  37 --
 .../validator/AllParamValidatorTest.scala       |  40 --
 .../measure/data/connector/ConnectorTest.scala  |  97 -----
 .../measure/persist/HdfsPersistTest.scala       |  48 ---
 .../measure/persist/HttpPersistTest.scala       |  42 --
 .../measure/persist/MongoPersistTest.scala      |  47 ---
 .../measure/result/AccuracyResultTest.scala     |  57 ---
 .../measure/result/ProfileResultTest.scala      |  57 ---
 .../rule/adaptor/GriffinDslAdaptorTest.scala    | 178 --------
 .../rule/adaptor/RuleAdaptorGroupTest.scala     |  70 ----
 .../rule/adaptor/SparkSqlAdaptorTest.scala      |  59 ---
 .../rule/dsl/parser/BasicParserTest.scala       | 234 -----------
 .../measure/rule/udf/GriffinUdfsTest.scala      |  52 ---
 .../griffin/measure/rule/udf/MeanUdafTest.scala | 117 ------
 .../griffin/measure/utils/HdfsUtilTest.scala    | 132 ------
 .../griffin/measure/utils/ParamUtilTest.scala   |  50 ---
 .../griffin/measure/utils/TimeUtilTest.scala    |  38 --
 348 files changed, 10318 insertions(+), 17321 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/pom.xml
----------------------------------------------------------------------
diff --git a/measure/pom.xml b/measure/pom.xml
index 04402af..79cf3cb 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -18,211 +18,205 @@ specific language governing permissions and limitations
 under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.griffin</groupId>
-    <artifactId>griffin</artifactId>
-    <version>0.2.0-incubating-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>measure</artifactId>
-  <packaging>jar</packaging>
-
-  <name>Apache Griffin :: Measures</name>
-  <url>http://maven.apache.org</url>
-
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-
-    <maven.compiler.source>1.8</maven.compiler.source>
-    <maven.compiler.target>1.8</maven.compiler.target>
-
-    <scala.version>2.11.8</scala.version>
-    <spark.version>2.2.1</spark.version>
-    <scala.binary.version>2.11</scala.binary.version>
-
-    <avro.version>1.7.7</avro.version>
-    <jackson.version>2.8.7</jackson.version>
-    <scalaj.version>2.3.0</scalaj.version>
-    <mongo.version>2.1.0</mongo.version>
-    <junit.version>4.11</junit.version>
-    <scalatest.version>3.0.0</scalatest.version>
-    <slf4j.version>1.7.21</slf4j.version>
-    <log4j.version>1.2.16</log4j.version>
-    <curator.version>2.10.0</curator.version>
-    <scalamock.version>3.6.0</scalamock.version>
-  </properties>
-
-  <dependencies>
-    <!--scala-->
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-      <version>${scala.version}</version>
-    </dependency>
-
-    <!--spark, spark streaming, spark hive-->
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.binary.version}</artifactId>
-      <version>${spark.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${spark.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-hive_${scala.binary.version}</artifactId>
-      <version>${spark.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      
<artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
-      <version>${spark.version}</version>
-    </dependency>
-
-    <!--jackson-->
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-      <version>${jackson.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.fasterxml.jackson.module</groupId>
-      <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
-      <version>${jackson.version}</version>
-    </dependency>
-
-    <!--scalaj for http request-->
-    <dependency>
-      <groupId>org.scalaj</groupId>
-      <artifactId>scalaj-http_${scala.binary.version}</artifactId>
-      <version>${scalaj.version}</version>
-    </dependency>
-
-    <!--mongo request-->
-    <dependency>
-      <groupId>org.mongodb.scala</groupId>
-      <artifactId>mongo-scala-driver_2.11</artifactId>
-      <version>${mongo.version}</version>
-    </dependency>
-
-    <!--avro-->
-    <dependency>
-      <groupId>com.databricks</groupId>
-      <artifactId>spark-avro_${scala.binary.version}</artifactId>
-      <version>4.0.0</version>
-    </dependency>
-
-    <!--log4j-->
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <version>${slf4j.version}</version>
-    </dependency>
-    <!--<dependency>-->
-    <!--<groupId>org.slf4j</groupId>-->
-    <!--<artifactId>slf4j-simple</artifactId>-->
-    <!--<version>${slf4j.version}</version>-->
-    <!--</dependency>-->
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <version>${slf4j.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
-      <version>${log4j.version}</version>
-    </dependency>
-
-    <!--curator-->
-    <dependency>
-      <groupId>org.apache.curator</groupId>
-      <artifactId>curator-recipes</artifactId>
-      <version>${curator.version}</version>
-    </dependency>
-
-    <!--junit-->
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <version>${junit.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <!--scala test-->
-    <dependency>
-      <groupId>org.scalatest</groupId>
-      <artifactId>scalatest_${scala.binary.version}</artifactId>
-      <version>${scalatest.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <!--scala mock-->
-    <dependency>
-      <groupId>org.scalamock</groupId>
-      
<artifactId>scalamock-scalatest-support_${scala.binary.version}</artifactId>
-      <version>${scalamock.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>com.sun.mail</groupId>
-      <artifactId>javax.mail</artifactId>
-      <version>1.4.4</version>
-    </dependency>
-
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.scala-tools</groupId>
-        <artifactId>maven-scala-plugin</artifactId>
-        <version>2.15.2</version>
-        <executions>
-          <execution>
-            <id>compile</id>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-            <phase>compile</phase>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <version>3.5.1</version>
-        <configuration>
-          <source>1.8</source>
-          <target>1.8</target>
-        </configuration>
-      </plugin>
-      <plugin>
-        <artifactId>maven-assembly-plugin</artifactId>
-        <configuration>
-          <descriptorRefs>
-            <descriptorRef>jar-with-dependencies</descriptorRef>
-          </descriptorRefs>
-          <appendAssemblyId>false</appendAssemblyId>
-        </configuration>
-        <executions>
-          <execution>
-            <id>make-assembly</id>
-            <phase>package</phase>
-            <goals>
-              <goal>single</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.griffin</groupId>
+        <artifactId>griffin</artifactId>
+        <version>0.2.0-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>measure</artifactId>
+    <packaging>jar</packaging>
+
+    <name>Apache Griffin :: Measures</name>
+    <url>http://maven.apache.org</url>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+
+        <scala.version>2.11.8</scala.version>
+        <spark.version>2.2.1</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+
+        <avro.version>1.7.7</avro.version>
+        <jackson.version>2.8.7</jackson.version>
+        <scalaj.version>2.3.0</scalaj.version>
+        <mongo.version>2.1.0</mongo.version>
+        <junit.version>4.11</junit.version>
+        <scalatest.version>3.0.0</scalatest.version>
+        <slf4j.version>1.7.21</slf4j.version>
+        <log4j.version>1.2.16</log4j.version>
+        <curator.version>2.10.0</curator.version>
+        <scalamock.version>3.6.0</scalamock.version>
+    </properties>
+
+    <dependencies>
+        <!--scala-->
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+
+        <!--spark, spark streaming, spark hive-->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            
<artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <!--jackson-->
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.module</groupId>
+            
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+
+        <!--scalaj for http request-->
+        <dependency>
+            <groupId>org.scalaj</groupId>
+            <artifactId>scalaj-http_${scala.binary.version}</artifactId>
+            <version>${scalaj.version}</version>
+        </dependency>
+
+        <!--mongo request-->
+        <dependency>
+            <groupId>org.mongodb.scala</groupId>
+            <artifactId>mongo-scala-driver_2.11</artifactId>
+            <version>${mongo.version}</version>
+        </dependency>
+
+        <!--avro-->
+        <dependency>
+            <groupId>com.databricks</groupId>
+            <artifactId>spark-avro_${scala.binary.version}</artifactId>
+            <version>4.0.0</version>
+        </dependency>
+
+        <!--log4j-->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <!--<dependency>-->
+        <!--<groupId>org.slf4j</groupId>-->
+        <!--<artifactId>slf4j-simple</artifactId>-->
+        <!--<version>${slf4j.version}</version>-->
+        <!--</dependency>-->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>${log4j.version}</version>
+        </dependency>
+
+        <!--curator-->
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <version>${curator.version}</version>
+        </dependency>
+
+        <!--junit-->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!--scala test-->
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.binary.version}</artifactId>
+            <version>${scalatest.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!--scala mock-->
+        <dependency>
+            <groupId>org.scalamock</groupId>
+            
<artifactId>scalamock-scalatest-support_${scala.binary.version}</artifactId>
+            <version>${scalamock.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.scala-tools</groupId>
+                <artifactId>maven-scala-plugin</artifactId>
+                <version>2.15.2</version>
+                <executions>
+                    <execution>
+                        <id>compile</id>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                        <phase>compile</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                    <appendAssemblyId>false</appendAssemblyId>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/resources/config-old.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-old.json 
b/measure/src/main/resources/config-old.json
deleted file mode 100644
index ab32b75..0000000
--- a/measure/src/main/resources/config-old.json
+++ /dev/null
@@ -1,31 +0,0 @@
-{
-  "name": "accu1",
-  "type": "accuracy",
-
-  "process.type": "batch",
-
-  "source": {
-    "type": "hive",
-    "version": "1.2",
-    "config": {
-      "database": "default",
-      "table.name": "users_info_src",
-      "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
-    }
-  },
-
-  "target": {
-    "type": "hive",
-    "version": "1.2",
-    "config": {
-      "database": "default",
-      "table.name": "users_info_target",
-      "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
-    }
-  },
-
-  "evaluateRule": {
-    "sampleRatio": 0.2,
-    "rules": "$source.user_id = $target.user_id AND $source.first_name = 
$target.first_name AND $source.last_name = $target.last_name AND 
$source.address = $target.address AND $source.email = $target.email AND 
$source.phone = $target.phone AND $source.post_code = $target.post_code"
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/resources/config-sql.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-sql.json 
b/measure/src/main/resources/config-sql.json
deleted file mode 100644
index aad9584..0000000
--- a/measure/src/main/resources/config-sql.json
+++ /dev/null
@@ -1,54 +0,0 @@
-{
-  "name": "accu1",
-
-  "process.type": "batch",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "connectors": [
-        {
-          "type": "hive",
-          "version": "1.2",
-          "config": {
-            "database": "default",
-            "table.name": "users_info_src",
-            "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
-          }
-        }
-      ]
-    }, {
-      "name": "target",
-      "connectors": [
-        {
-          "type": "hive",
-          "version": "1.2",
-          "config": {
-            "database": "default",
-            "table.name": "users_info_target",
-            "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
-          }
-        }
-      ]
-    }
-  ],
-
-  "evaluateRule": {
-    "dsl.type": "spark-sql",
-    "rules": [
-      {
-        "name": "miss.record",
-        "rule": "SELECT source.name FROM source LEFT JOIN target ON 
coalesce(source.name, 'null') = coalesce(target.name, 'null') WHERE (NOT 
(source.name IS NULL)) AND (target.name IS NULL)",
-        "persist.type": "record"
-      }, {
-        "name": "miss.count",
-        "rule": "SELECT COUNT(*) FROM miss",
-        "persist.type": "metric"
-      }, {
-        "name": "total.count",
-        "rule": "SELECT COUNT(*) FROM source",
-        "persist.type": "metric"
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/resources/config-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-streaming.json 
b/measure/src/main/resources/config-streaming.json
deleted file mode 100644
index 1705174..0000000
--- a/measure/src/main/resources/config-streaming.json
+++ /dev/null
@@ -1,61 +0,0 @@
-{
-  "name": "accu2",
-  "type": "accuracy",
-
-  "process.type": "steaming",
-
-  "source": {
-    "type": "kafka",
-    "version": "0.8",
-    "config": {
-      "kafka.config": {
-        "bootstrap.servers": "a.b.c.d:9092",
-        "group.id": "group1",
-        "auto.offset.reset": "smallest",
-        "auto.commit.enable": "false",
-      },
-      "topics": "src",
-      "key.type": "java.lang.String",
-      "value.type": "java.lang.String",
-      "cache": {
-        "type": "df",
-        "config": {
-          "table.name": "source",
-          "info.path": "src",
-          "ready.time.interval": "1m",
-          "ready.time.delay": "1m"
-        }
-      }
-    }
-  },
-
-  "target": {
-    "type": "kafka",
-    "version": "0.8",
-    "config": {
-      "kafka.config": {
-        "bootstrap.servers": "a.b.c.d:9092",
-        "group.id": "group1",
-        "auto.offset.reset": "smallest",
-        "auto.commit.enable": "false",
-      },
-      "topics": "tgt",
-      "key.type": "java.lang.String",
-      "value.type": "java.lang.String",
-      "cache": {
-        "type": "hive",
-        "version": 1.2,
-        "config": {
-          "database": "default",
-          "table.name": "target_table",
-          "info.path": "tgt"
-        }
-      }
-    }
-  },
-
-  "evaluateRule": {
-    "sampleRatio": 0.2,
-    "rules": "$source.json().seeds[*].json().url = 
$target.json().groups[0].attrsList['name' = 'URL'].values[0] AND 
$source.json().seeds[*].json().metadata.json().tracker.crawlRequestCreateTS = 
$target.json().groups[0].attrsList['name' = 
'CRAWLMETADATA'].values[0].json().tracker.crawlRequestCreateTS WHEN 
$source._timestamp_ + 24h < $target._timestamp_"
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/resources/config.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config.json 
b/measure/src/main/resources/config.json
deleted file mode 100644
index 1edb299..0000000
--- a/measure/src/main/resources/config.json
+++ /dev/null
@@ -1,59 +0,0 @@
-{
-  "name": "accu1",
-
-  "process.type": "batch",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "connectors": [
-        {
-          "type": "hive",
-          "version": "1.2",
-          "config": {
-            "database": "default",
-            "table.name": "users_info_src",
-            "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
-          }
-        }
-      ]
-    }, {
-      "name": "target",
-      "connectors": [
-        {
-          "type": "hive",
-          "version": "1.2",
-          "config": {
-            "database": "default",
-            "table.name": "users_info_target",
-            "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
-          }
-        }
-      ]
-    }
-  ],
-  "evaluateRule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "accuracy",
-        "rule": "source.user_id = target.user_id AND source.first_name = 
target.first_name AND source.last_name = target.last_name AND source.address = 
target.address AND source.email = target.email AND source.phone = target.phone 
AND source.post_code = target.post_code",
-        "details": {
-          "source": "source",
-          "miss.record": {
-            "name": "miss.record",
-            "persist.type": "record"
-          },
-          "miss.count": {
-            "name": "miss.count",
-            "persist.type": "metric"
-          },
-          "total.count": {
-            "name": "total.count",
-            "persist.type": "metric"
-          }
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/resources/env.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/env.json 
b/measure/src/main/resources/env.json
deleted file mode 100644
index 1081b6a..0000000
--- a/measure/src/main/resources/env.json
+++ /dev/null
@@ -1,60 +0,0 @@
-{
-  "spark": {
-    "log.level": "INFO",
-    "checkpoint.dir": "hdfs:///griffin/batch/cp",
-    "batch.interval": "10s",
-    "process.interval": "10m",
-    "config": {}
-  },
-
-  "persist": [
-    {
-      "type": "hdfs",
-      "config": {
-        "path": "hdfs:///griffin/streaming/persist",
-        "max.persist.lines": 10000,
-        "max.lines.per.file": 10000
-      }
-    },
-    {
-      "type": "http",
-      "config": {
-        "method": "post",
-        "api": "http://HOSTNAME:9200/griffin/accuracy";
-      }
-    }
-  ],
-
-  "info.cache": [
-    {
-      "type": "zk",
-      "config": {
-        "hosts": "localhost:2181",
-        "namespace": "griffin/infocache",
-        "lock.path": "lock",
-        "mode": "persist",
-        "init.clear": true,
-        "close.clear": false
-      }
-    }
-  ],
-  "mail":[
-    {
-      "host":"smtp.163.com",//mail host(163)
-      "mail":"[email protected]",//mail
-      "user":"[email protected]",//mail user
-      "password":"xxx"//mail password
-    }
-  ],
-  "sms":[
-    {
-      "host":"xxx",//meassage url
-      "id":"xx",
-      "key":"xxx",
-      "UUID":"xxx"
-    }
-  ],
-  "cleaner": {
-
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/Application.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/Application.scala 
b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index 43781f2..25dc34e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -18,54 +18,51 @@ under the License.
 */
 package org.apache.griffin.measure
 
-import org.apache.griffin.measure.config.params._
-import org.apache.griffin.measure.config.params.env._
-import org.apache.griffin.measure.config.params.user._
-import org.apache.griffin.measure.config.reader._
-import org.apache.griffin.measure.config.validator._
-import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.persist.PersistThreadPool
-import org.apache.griffin.measure.process._
+import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.json.ParamReaderFactory
+import org.apache.griffin.measure.configuration.params.{AllParam, DQParam, 
EnvParam, Param}
+import org.apache.griffin.measure.configuration.validator.ParamValidator
+import org.apache.griffin.measure.context.writer.PersistThreadPool
+import org.apache.griffin.measure.launch.DQApp
+import org.apache.griffin.measure.launch.batch.BatchDQApp
+import org.apache.griffin.measure.launch.streaming.StreamingDQApp
 
 import scala.util.{Failure, Success, Try}
 
+/**
+  * application entrance
+  */
 object Application extends Loggable {
 
   def main(args: Array[String]): Unit = {
     info(args.toString)
     if (args.length < 2) {
-      error("Usage: class <env-param> <user-param> [List of String split by 
comma: raw | local | hdfs(default)]")
+      error("Usage: class <env-param> <dq-param>")
       sys.exit(-1)
     }
 
     val envParamFile = args(0)
-    val userParamFile = args(1)
-    val (envFsType, userFsType) = if (args.length > 2) {
-      val fsTypes = args(2).trim.split(",")
-      if (fsTypes.length == 1) (fsTypes(0).trim, fsTypes(0).trim)
-      else if (fsTypes.length >= 2) (fsTypes(0).trim, fsTypes(1).trim)
-      else ("hdfs", "hdfs")
-    } else ("hdfs", "hdfs")
+    val dqParamFile = args(1)
 
     info(envParamFile)
-    info(userParamFile)
+    info(dqParamFile)
 
     // read param files
-    val envParam = readParamFile[EnvParam](envParamFile, envFsType) match {
+    val envParam = readParamFile[EnvParam](envParamFile) match {
       case Success(p) => p
       case Failure(ex) => {
         error(ex.getMessage)
         sys.exit(-2)
       }
     }
-    val userParam = readParamFile[UserParam](userParamFile, userFsType) match {
+    val dqParam = readParamFile[DQParam](dqParamFile) match {
       case Success(p) => p
       case Failure(ex) => {
         error(ex.getMessage)
         sys.exit(-2)
       }
     }
-    val allParam: AllParam = AllParam(envParam, userParam)
+    val allParam: AllParam = AllParam(envParam, dqParam)
 
     // validate param files
     ParamValidator.validate(allParam) match {
@@ -78,18 +75,19 @@ object Application extends Loggable {
       }
     }
 
-    // choose algorithm
-//    val dqType = allParam.userParam.dqType
-    val procType = ProcessType(allParam.userParam.procType)
-    val proc: DqProcess = procType match {
-      case BatchProcessType => BatchDqProcess(allParam)
-      case StreamingProcessType => StreamingDqProcess(allParam)
+    // choose process
+    val procType = ProcessType(allParam.dqParam.procType)
+    val proc: DQApp = procType match {
+      case BatchProcessType => BatchDQApp(allParam)
+      case StreamingProcessType => StreamingDQApp(allParam)
       case _ => {
         error(s"${procType} is unsupported process type!")
         sys.exit(-4)
       }
     }
 
+    startup
+
     // process init
     proc.init match {
       case Success(_) => {
@@ -110,7 +108,7 @@ object Application extends Loggable {
       case Failure(ex) => {
         error(s"process run error: ${ex.getMessage}")
 
-        if (proc.retriable) {
+        if (proc.retryable) {
           throw ex
         } else {
           shutdown
@@ -120,7 +118,7 @@ object Application extends Loggable {
     }
 
     // process end
-    proc.end match {
+    proc.close match {
       case Success(_) => {
         info("process end success")
       }
@@ -132,45 +130,17 @@ object Application extends Loggable {
     }
 
     shutdown
-
-//    val algo: Algo = (dqType, procType) match {
-//      case (MeasureType.accuracy(), ProcessType.batch()) => 
BatchAccuracyAlgo(allParam)
-//      case (MeasureType.profile(), ProcessType.batch()) => 
BatchProfileAlgo(allParam)
-//      case (MeasureType.accuracy(), ProcessType.streaming()) => 
StreamingAccuracyAlgo(allParam)
-////      case (MeasureType.profile(), ProcessType.streaming()) => 
StreamingProfileAlgo(allParam)
-//      case _ => {
-//        error(s"${dqType} with ${procType} is unsupported dq type!")
-//        sys.exit(-4)
-//      }
-//    }
-
-    // algorithm run
-//    algo.run match {
-//      case Failure(ex) => {
-//        error(s"app error: ${ex.getMessage}")
-//
-//        procType match {
-//          case ProcessType.streaming() => {
-//            // streaming need to attempt more times by spark streaming itself
-//            throw ex
-//          }
-//          case _ => {
-//            shutdown
-//            sys.exit(-5)
-//          }
-//        }
-//      }
-//      case _ => {
-//        info("app finished and success")
-//      }
-//    }
   }
 
-  private def readParamFile[T <: Param](file: String, fsType: String)(implicit 
m : Manifest[T]): Try[T] = {
-    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
+  private def readParamFile[T <: Param](file: String)(implicit m : 
Manifest[T]): Try[T] = {
+    val paramReader = ParamReaderFactory.getParamReader(file)
     paramReader.readConfig[T]
   }
 
+  private def startup(): Unit = {
+    PersistThreadPool.start
+  }
+
   private def shutdown(): Unit = {
     PersistThreadPool.shutdown
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/Loggable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/Loggable.scala 
b/measure/src/main/scala/org/apache/griffin/measure/Loggable.scala
new file mode 100644
index 0000000..961fd25
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/Loggable.scala
@@ -0,0 +1,43 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure
+
+import org.slf4j.LoggerFactory
+
+trait Loggable {
+
+  @transient private lazy val logger = LoggerFactory.getLogger(getClass)
+
+  protected def info(msg: String): Unit = {
+    logger.info(msg)
+  }
+
+  protected def debug(msg: String): Unit = {
+    logger.debug(msg)
+  }
+
+  protected def warn(msg: String): Unit = {
+    logger.warn(msg)
+  }
+
+  protected def error(msg: String): Unit = {
+    logger.error(msg)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCache.scala 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCache.scala
deleted file mode 100644
index 8e4b25d..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCache.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.cache.info
-
-import org.apache.griffin.measure.cache.lock.CacheLock
-import org.apache.griffin.measure.log.Loggable
-
-trait InfoCache extends Loggable with Serializable {
-
-  def init(): Unit
-  def available(): Boolean
-  def close(): Unit
-
-  def cacheInfo(info: Map[String, String]): Boolean
-  def readInfo(keys: Iterable[String]): Map[String, String]
-  def deleteInfo(keys: Iterable[String]): Unit
-  def clearInfo(): Unit
-
-  def listKeys(path: String): List[String]
-
-  def genLock(s: String): CacheLock
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheFactory.scala
deleted file mode 100644
index 3c9d70a..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheFactory.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.cache.info
-
-import org.apache.griffin.measure.config.params.env.InfoCacheParam
-
-import scala.util.{Success, Try}
-
-case class InfoCacheFactory(infoCacheParams: Iterable[InfoCacheParam], 
metricName: String) extends Serializable {
-
-  val ZK_REGEX = """^(?i)zk|zookeeper$""".r
-
-  def getInfoCache(infoCacheParam: InfoCacheParam): Option[InfoCache] = {
-    val config = infoCacheParam.config
-    val infoCacheTry = infoCacheParam.persistType match {
-      case ZK_REGEX() => Try(ZKInfoCache(config, metricName))
-      case _ => throw new Exception("not supported info cache type")
-    }
-    infoCacheTry match {
-      case Success(infoCache) => Some(infoCache)
-      case _ => None
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheInstance.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheInstance.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheInstance.scala
deleted file mode 100644
index b116fca..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheInstance.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.cache.info
-
-import org.apache.griffin.measure.cache.lock.{CacheLock, MultiCacheLock}
-import org.apache.griffin.measure.config.params.env.InfoCacheParam
-
-object InfoCacheInstance extends InfoCache {
-  var infoCaches: List[InfoCache] = Nil
-
-  def initInstance(infoCacheParams: Iterable[InfoCacheParam], metricName: 
String) = {
-    val fac = InfoCacheFactory(infoCacheParams, metricName)
-    infoCaches = infoCacheParams.flatMap(param => 
fac.getInfoCache(param)).toList
-  }
-
-  def init(): Unit = infoCaches.foreach(_.init)
-  def available(): Boolean = infoCaches.foldLeft(false)(_ || _.available)
-  def close(): Unit = infoCaches.foreach(_.close)
-
-  def cacheInfo(info: Map[String, String]): Boolean = {
-    infoCaches.foldLeft(false) { (res, infoCache) => res || 
infoCache.cacheInfo(info) }
-  }
-  def readInfo(keys: Iterable[String]): Map[String, String] = {
-    val maps = infoCaches.map(_.readInfo(keys)).reverse
-    maps.fold(Map[String, String]())(_ ++ _)
-  }
-  def deleteInfo(keys: Iterable[String]): Unit = 
infoCaches.foreach(_.deleteInfo(keys))
-  def clearInfo(): Unit = infoCaches.foreach(_.clearInfo)
-
-  def listKeys(path: String): List[String] = {
-    infoCaches.foldLeft(Nil: List[String]) { (res, infoCache) =>
-      if (res.nonEmpty) res else infoCache.listKeys(path)
-    }
-  }
-
-  def genLock(s: String): CacheLock = 
MultiCacheLock(infoCaches.map(_.genLock(s)))
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
deleted file mode 100644
index c976453..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.cache.info
-
-import org.apache.griffin.measure.log.Loggable
-
-object TimeInfoCache extends Loggable with Serializable {
-
-  private val CacheTime = "cache.time"
-  private val LastProcTime = "last.proc.time"
-  private val ReadyTime = "ready.time"
-  private val CleanTime = "clean.time"
-  private val OldCacheIndex = "old.cache.index"
-
-  def cacheTime(path: String): String = s"${path}/${CacheTime}"
-  def lastProcTime(path: String): String = s"${path}/${LastProcTime}"
-  def readyTime(path: String): String = s"${path}/${ReadyTime}"
-  def cleanTime(path: String): String = s"${path}/${CleanTime}"
-  def oldCacheIndex(path: String): String = s"${path}/${OldCacheIndex}"
-
-  val infoPath = "info"
-
-  val finalCacheInfoPath = "info.final"
-  val finalReadyTime = s"${finalCacheInfoPath}/${ReadyTime}"
-  val finalLastProcTime = s"${finalCacheInfoPath}/${LastProcTime}"
-  val finalCleanTime = s"${finalCacheInfoPath}/${CleanTime}"
-
-  def startTimeInfoCache(): Unit = {
-    genFinalReadyTime
-  }
-
-  def getTimeRange(): (Long, Long) = {
-    readTimeRange
-  }
-
-  def getCleanTime(): Long = {
-    readCleanTime
-  }
-
-  def endTimeInfoCache: Unit = {
-    genFinalLastProcTime
-    genFinalCleanTime
-  }
-
-  private def genFinalReadyTime(): Unit = {
-    val subPath = InfoCacheInstance.listKeys(infoPath)
-    val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" }
-    val result = InfoCacheInstance.readInfo(keys)
-    val times = keys.flatMap { k =>
-      getLongOpt(result, k)
-    }
-    if (times.nonEmpty) {
-      val time = times.min
-      val map = Map[String, String]((finalReadyTime -> time.toString))
-      InfoCacheInstance.cacheInfo(map)
-    }
-  }
-
-  private def genFinalLastProcTime(): Unit = {
-    val subPath = InfoCacheInstance.listKeys(infoPath)
-    val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" }
-    val result = InfoCacheInstance.readInfo(keys)
-    val times = keys.flatMap { k =>
-      getLongOpt(result, k)
-    }
-    if (times.nonEmpty) {
-      val time = times.min
-      val map = Map[String, String]((finalLastProcTime -> time.toString))
-      InfoCacheInstance.cacheInfo(map)
-    }
-  }
-
-  private def genFinalCleanTime(): Unit = {
-    val subPath = InfoCacheInstance.listKeys(infoPath)
-    val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" }
-    val result = InfoCacheInstance.readInfo(keys)
-    val times = keys.flatMap { k =>
-      getLongOpt(result, k)
-    }
-    if (times.nonEmpty) {
-      val time = times.min
-      val map = Map[String, String]((finalCleanTime -> time.toString))
-      InfoCacheInstance.cacheInfo(map)
-    }
-  }
-
-  private def readTimeRange(): (Long, Long) = {
-    val map = InfoCacheInstance.readInfo(List(finalLastProcTime, 
finalReadyTime))
-    val lastProcTime = getLong(map, finalLastProcTime)
-    val curReadyTime = getLong(map, finalReadyTime)
-    (lastProcTime, curReadyTime)
-  }
-
-  private def readCleanTime(): Long = {
-    val map = InfoCacheInstance.readInfo(List(finalCleanTime))
-    val cleanTime = getLong(map, finalCleanTime)
-    cleanTime
-  }
-
-  private def getLongOpt(map: Map[String, String], key: String): Option[Long] 
= {
-    try {
-      map.get(key).map(_.toLong)
-    } catch {
-      case e: Throwable => None
-    }
-  }
-  private def getLong(map: Map[String, String], key: String) = {
-    getLongOpt(map, key).getOrElse(-1L)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
deleted file mode 100644
index 3789a05..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.cache.info
-
-import org.apache.curator.framework.imps.CuratorFrameworkState
-import org.apache.curator.framework.recipes.locks.InterProcessMutex
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.curator.utils.ZKPaths
-import org.apache.griffin.measure.cache.lock.ZKCacheLock
-import org.apache.zookeeper.CreateMode
-
-import scala.collection.JavaConverters._
-
-case class ZKInfoCache(config: Map[String, Any], metricName: String) extends 
InfoCache {
-
-  val Hosts = "hosts"
-  val Namespace = "namespace"
-  val Mode = "mode"
-  val InitClear = "init.clear"
-  val CloseClear = "close.clear"
-  val LockPath = "lock.path"
-
-  val PersistRegex = """^(?i)persist$""".r
-  val EphemeralRegex = """^(?i)ephemeral$""".r
-
-  final val separator = ZKPaths.PATH_SEPARATOR
-
-  val hosts = config.getOrElse(Hosts, "").toString
-  val namespace = config.getOrElse(Namespace, "").toString
-  val mode: CreateMode = config.get(Mode) match {
-    case Some(s: String) => s match {
-      case PersistRegex() => CreateMode.PERSISTENT
-      case EphemeralRegex() => CreateMode.EPHEMERAL
-      case _ => CreateMode.PERSISTENT
-    }
-    case _ => CreateMode.PERSISTENT
-  }
-  val initClear = config.get(InitClear) match {
-    case Some(b: Boolean) => b
-    case _ => true
-  }
-  val closeClear = config.get(CloseClear) match {
-    case Some(b: Boolean) => b
-    case _ => false
-  }
-  val lockPath = config.getOrElse(LockPath, "lock").toString
-
-  private val cacheNamespace: String = if (namespace.isEmpty) metricName else 
namespace + separator + metricName
-  private val builder = CuratorFrameworkFactory.builder()
-    .connectString(hosts)
-    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
-    .namespace(cacheNamespace)
-  private val client: CuratorFramework = builder.build
-
-  def init(): Unit = {
-    client.start()
-    info("start zk info cache")
-    client.usingNamespace(cacheNamespace)
-    info(s"init with namespace: ${cacheNamespace}")
-    deleteInfo(lockPath :: Nil)
-    if (initClear) {
-      clearInfo
-    }
-  }
-
-  def available(): Boolean = {
-    client.getState match {
-      case CuratorFrameworkState.STARTED => true
-      case _ => false
-    }
-  }
-
-  def close(): Unit = {
-    if (closeClear) {
-      clearInfo
-    }
-    info("close zk info cache")
-    client.close()
-  }
-
-  def cacheInfo(info: Map[String, String]): Boolean = {
-    info.foldLeft(true) { (rs, pair) =>
-      val (k, v) = pair
-      createOrUpdate(path(k), v) && rs
-    }
-  }
-
-  def readInfo(keys: Iterable[String]): Map[String, String] = {
-    keys.flatMap { key =>
-      read(path(key)) match {
-        case Some(v) => Some((key, v))
-        case _ => None
-      }
-    }.toMap
-  }
-
-  def deleteInfo(keys: Iterable[String]): Unit = {
-    keys.foreach { key => delete(path(key)) }
-  }
-
-  def clearInfo(): Unit = {
-//    delete("/")
-    deleteInfo(TimeInfoCache.finalCacheInfoPath :: Nil)
-    deleteInfo(TimeInfoCache.infoPath :: Nil)
-    println("clear info")
-  }
-
-  def listKeys(p: String): List[String] = {
-    children(path(p))
-  }
-
-  def genLock(s: String): ZKCacheLock = {
-    val lpt = if (s.isEmpty) path(lockPath) else path(lockPath) + separator + s
-    ZKCacheLock(new InterProcessMutex(client, lpt))
-  }
-
-  private def path(k: String): String = {
-    if (k.startsWith(separator)) k else separator + k
-  }
-
-  private def children(path: String): List[String] = {
-    try {
-      client.getChildren().forPath(path).asScala.toList
-    } catch {
-      case e: Throwable => {
-        warn(s"list ${path} warn: ${e.getMessage}")
-        Nil
-      }
-    }
-  }
-
-  private def createOrUpdate(path: String, content: String): Boolean = {
-    if (checkExists(path)) {
-      update(path, content)
-    } else {
-      create(path, content)
-    }
-  }
-
-  private def create(path: String, content: String): Boolean = {
-    try {
-      client.create().creatingParentsIfNeeded().withMode(mode)
-        .forPath(path, content.getBytes("utf-8"))
-      true
-    } catch {
-      case e: Throwable => {
-        error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}")
-        false
-      }
-    }
-  }
-
-  private def update(path: String, content: String): Boolean = {
-    try {
-      client.setData().forPath(path, content.getBytes("utf-8"))
-      true
-    } catch {
-      case e: Throwable => {
-        error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}")
-        false
-      }
-    }
-  }
-
-  private def read(path: String): Option[String] = {
-    try {
-      Some(new String(client.getData().forPath(path), "utf-8"))
-    } catch {
-      case e: Throwable => {
-        warn(s"read ${path} warn: ${e.getMessage}")
-        None
-      }
-    }
-  }
-
-  private def delete(path: String): Unit = {
-    try {
-      client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path)
-    } catch {
-      case e: Throwable => error(s"delete ${path} error: ${e.getMessage}")
-    }
-  }
-
-  private def checkExists(path: String): Boolean = {
-    try {
-      client.checkExists().forPath(path) != null
-    } catch {
-      case e: Throwable => {
-        warn(s"check exists ${path} warn: ${e.getMessage}")
-        false
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/lock/CacheLock.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/lock/CacheLock.scala 
b/measure/src/main/scala/org/apache/griffin/measure/cache/lock/CacheLock.scala
deleted file mode 100644
index 24f142c..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/cache/lock/CacheLock.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.cache.lock
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.griffin.measure.log.Loggable
-
-trait CacheLock extends Loggable with Serializable {
-
-  def lock(outtime: Long, unit: TimeUnit): Boolean
-
-  def unlock(): Unit
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/lock/MultiCacheLock.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/lock/MultiCacheLock.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/cache/lock/MultiCacheLock.scala
deleted file mode 100644
index 7b835f4..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/cache/lock/MultiCacheLock.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.cache.lock
-
-import java.util.concurrent.TimeUnit
-
-case class MultiCacheLock(cacheLocks: List[CacheLock]) extends CacheLock {
-
-  def lock(outtime: Long, unit: TimeUnit): Boolean = {
-    cacheLocks.headOption match {
-      case Some(cl) => cl.lock(outtime, unit)
-      case None => true
-    }
-  }
-
-  def unlock(): Unit = {
-    cacheLocks.headOption match {
-      case Some(cl) => cl.unlock
-      case None => {}
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/lock/ZKCacheLock.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/lock/ZKCacheLock.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/cache/lock/ZKCacheLock.scala
deleted file mode 100644
index 77ee83b..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/cache/lock/ZKCacheLock.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.cache.lock
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.curator.framework.recipes.locks.InterProcessMutex
-
-case class ZKCacheLock(@transient mutex: InterProcessMutex) extends CacheLock {
-
-  def lock(outtime: Long, unit: TimeUnit): Boolean = {
-    try {
-      if (outtime >= 0) {
-        mutex.acquire(outtime, unit)
-      } else {
-        mutex.acquire(-1, null)
-      }
-    } catch {
-      case e: Throwable => {
-        error(s"lock error: ${e.getMessage}")
-        false
-      }
-    }
-
-  }
-
-  def unlock(): Unit = {
-    try {
-      if (mutex.isAcquiredInThisProcess) mutex.release
-    } catch {
-      case e: Throwable => {
-        error(s"unlock error: ${e.getMessage}")
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResult.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResult.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResult.scala
deleted file mode 100644
index 92787be..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResult.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.cache.result
-
-import org.apache.griffin.measure.result.Result
-
-case class CacheResult(timeGroup: Long, updateTime: Long, result: Result) {
-
-  def olderThan(ut: Long): Boolean = {
-    updateTime < ut
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
deleted file mode 100644
index 0511c04..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.cache.result
-
-import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.result._
-
-import scala.collection.mutable.{Map => MutableMap}
-
-object CacheResultProcesser extends Loggable {
-
-  private val cacheGroup: MutableMap[Long, CacheResult] = MutableMap()
-
-  def genUpdateCacheResult(timeGroup: Long, updateTime: Long, result: Result): 
Option[CacheResult] = {
-    cacheGroup.get(timeGroup) match {
-      case Some(cr) => {
-        if (cr.olderThan(updateTime)) {
-          val existResult = cr.result
-          val newResult = 
existResult.update(result.asInstanceOf[existResult.T])
-          if (existResult.differsFrom(newResult)) {
-            Some(CacheResult(timeGroup, updateTime, newResult))
-          } else None
-        } else None
-      }
-      case _ => {
-        Some(CacheResult(timeGroup, updateTime, result))
-      }
-    }
-  }
-
-  def update(cr: CacheResult): Unit = {
-    val t = cr.timeGroup
-    cacheGroup.get(t) match {
-      case Some(c) => {
-        if (c.olderThan(cr.updateTime)) cacheGroup += (t -> cr)
-      }
-      case _ => cacheGroup += (t -> cr)
-    }
-  }
-
-  def getCacheResult(timeGroup: Long): Option[CacheResult] = {
-    cacheGroup.get(timeGroup)
-  }
-
-  def refresh(overtime: Long): Unit = {
-    val curCacheGroup = cacheGroup.toMap
-    val deadCache = curCacheGroup.filter { pr =>
-      val (_, cr) = pr
-      cr.timeGroup < overtime || cr.result.eventual()
-    }
-    info(s"=== dead cache group count: ${deadCache.size} ===")
-    deadCache.keySet.foreach(cacheGroup -= _)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TempName.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TempName.scala 
b/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TempName.scala
deleted file mode 100644
index 7a570ec..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TempName.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.cache.tmst
-
-import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.rule.plan.TimeInfo
-
-object TempName extends Loggable {
-
-  def tmstName(name: String, ms: Long) = {
-    s"${name}_${ms}"
-  }
-
-  //-- temp df name --
-//  private val tmstNameRegex = """^(.*)\((\d*)\)\[(\d*)\]$""".r
-//  private val tmstNameRegex = """^(.*)_(\d*)_(\d*)$""".r
-//  def tmstName(name: String, timeInfo: TimeInfo) = {
-//    val calcTime = timeInfo.calcTime
-//    val tmst = timeInfo.tmst
-//    s"${name}_${calcTime}_${tmst}"
-//  }
-//  def extractTmstName(tmstName: String): (String, Option[Long], 
Option[Long]) = {
-//    tmstName match {
-//      case tmstNameRegex(name, calcTime, tmst) => {
-//        try { (name, Some(calcTime.toLong), Some(tmst.toLong)) } catch { 
case e: Throwable => (tmstName, None, None) }
-//      }
-//      case _ => (tmstName, None, None)
-//    }
-//  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TmstCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TmstCache.scala 
b/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TmstCache.scala
deleted file mode 100644
index f031449..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TmstCache.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.cache.tmst
-
-import java.util.concurrent.atomic.AtomicLong
-
-import org.apache.griffin.measure.log.Loggable
-
-import scala.collection.mutable.{SortedSet => MutableSortedSet}
-
-
-case class TmstCache() extends Loggable {
-
-  private val tmstGroup: MutableSortedSet[Long] = MutableSortedSet.empty[Long]
-
-  //-- insert tmst into tmst group --
-  def insert(tmst: Long) = tmstGroup += tmst
-  def insert(tmsts: Iterable[Long]) = tmstGroup ++= tmsts
-
-  //-- remove tmst from tmst group --
-  def remove(tmst: Long) = tmstGroup -= tmst
-  def remove(tmsts: Iterable[Long]) = tmstGroup --= tmsts
-
-  //-- get subset of tmst group --
-  def range(from: Long, until: Long) = tmstGroup.range(from, until).toSet
-  def until(until: Long) = tmstGroup.until(until).toSet
-  def from(from: Long) = tmstGroup.from(from).toSet
-  def all = tmstGroup.toSet
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/AllParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/AllParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/AllParam.scala
deleted file mode 100644
index 8990564..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/AllParam.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.params
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.config.params.env._
-import org.apache.griffin.measure.config.params.user._
-
-// simply composite of env and user params, for convenient usage
-@JsonInclude(Include.NON_NULL)
-case class AllParam( @JsonProperty("env") envParam: EnvParam,
-                     @JsonProperty("user") userParam: UserParam
-                   ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/Param.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/Param.scala 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/Param.scala
deleted file mode 100644
index d3484a1..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/Param.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.params
-
-trait Param extends Serializable {
-
-  def validate(): Boolean = true
-  
-}


Reply via email to