[GRIFFIN-6] Push griffin streaming measure

Push griffin streaming measure for a user case.

Author: Liu <[email protected]>

Closes #10 from bhlx3lyx7/GRIFFIN-6.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/f783cb78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/f783cb78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/f783cb78

Branch: refs/heads/master
Commit: f783cb78ed0949e3e7d166ebcc382233475e2529
Parents: 87fd749
Author: Liu <[email protected]>
Authored: Thu Mar 30 16:11:09 2017 +0800
Committer: William Guo <[email protected]>
Committed: Thu Mar 30 16:11:09 2017 +0800

----------------------------------------------------------------------
 .../dependency-reduced-pom.xml                  | 319 ++++++++
 griffin-streaming-model/pom.xml                 | 187 +++++
 .../src/main/resources/config.json              | 190 +++++
 .../resources/kafkaConsumerConfig.properties    |   2 +
 .../scala/org/apache/griffin/Application.scala  |  71 ++
 .../org/apache/griffin/algo/AccuracyAlgo.scala  |  11 +
 .../scala/org/apache/griffin/algo/Algo.scala    |  11 +
 .../griffin/algo/StreamingAccuracyAlgo.scala    |  27 +
 .../algo/StreamingAccuracyAlgo4Crawler.scala    | 770 +++++++++++++++++++
 .../apache/griffin/algo/core/AccuracyCore.scala |  92 +++
 .../apache/griffin/cache/CacheProcesser.scala   |  54 ++
 .../apache/griffin/cache/CachedAccuData.scala   |  23 +
 .../org/apache/griffin/cache/CachedData.scala   |  27 +
 .../griffin/config/ConfigFileReader.scala       |  20 +
 .../apache/griffin/config/ConfigReader.scala    |  11 +
 .../griffin/config/params/AccuracyParam.scala   |  11 +
 .../apache/griffin/config/params/AllParam.scala |  15 +
 .../griffin/config/params/ConfigParam.scala     |  10 +
 .../griffin/config/params/DataAssetParam.scala  |  14 +
 .../griffin/config/params/DqConfigParam.scala   |  10 +
 .../apache/griffin/config/params/DqType.scala   |  11 +
 .../griffin/config/params/DumpParam.scala       |  14 +
 .../griffin/config/params/MappingParam.scala    |  12 +
 .../config/params/ParseConfigParam.scala        |  10 +
 .../griffin/config/params/ParseParam.scala      |  12 +
 .../griffin/config/params/PrepParam.scala       |  11 +
 .../griffin/config/params/RecorderParam.scala   |  12 +
 .../griffin/config/params/RetryParam.scala      |  12 +
 .../griffin/config/params/SampleParam.scala     |  11 +
 .../config/params/SchemaFieldParam.scala        |  20 +
 .../griffin/config/params/SparkParam.scala      |  15 +
 .../griffin/config/params/TimeRangeParam.scala  |  12 +
 .../griffin/config/params/TypeConfigParam.scala |  11 +
 .../org/apache/griffin/dump/DumpWrapper.scala   |  21 +
 .../apache/griffin/dump/DumpWrapperInfo.scala   |  32 +
 .../apache/griffin/dump/HdfsFileDumpUtil.scala  |  37 +
 .../apache/griffin/prep/parse/DataParser.scala  |  10 +
 .../griffin/prep/parse/Json2MapParser.scala     |  59 ++
 .../apache/griffin/record/HdfsRecorder.scala    |  87 +++
 .../apache/griffin/record/NullRecorder.scala    |  20 +
 .../apache/griffin/record/PostRecorder.scala    |  46 ++
 .../org/apache/griffin/record/Recorder.scala    |  19 +
 .../apache/griffin/record/RecorderFactory.scala |  48 ++
 .../griffin/record/cleaner/CleanerFactory.scala |  31 +
 .../griffin/record/cleaner/HdfsCleaner.scala    |  78 ++
 .../griffin/record/result/AccuResult.scala      |  18 +
 .../apache/griffin/record/result/Result.scala   |  12 +
 .../org/apache/griffin/utils/DataTypeUtil.scala | 179 +++++
 .../apache/griffin/utils/ExtractJsonUtil.scala  | 205 +++++
 .../org/apache/griffin/utils/HdfsUtil.scala     |  57 ++
 .../org/apache/griffin/utils/JsonUtil.scala     |  33 +
 .../org/apache/griffin/utils/RestfulUtil.scala  |  16 +
 .../griffin/utils/ValueListCombineUtil.scala    | 198 +++++
 .../src/test/resources/config.json              |  11 +
 .../src/test/resources/input.msg                |   1 +
 .../src/test/resources/out1.msg                 |   1 +
 .../src/test/resources/output.msg               |   1 +
 .../griffin/config/ConfigFileReaderTest.scala   |  21 +
 .../apache/griffin/config/ConfigReadTest.scala  |  45 ++
 .../griffin/prep/parse/Json2MapParserTest.scala |  49 ++
 .../org/apache/griffin/test/DataParseTest.scala | 101 +++
 61 files changed, 3474 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/dependency-reduced-pom.xml
----------------------------------------------------------------------
diff --git a/griffin-streaming-model/dependency-reduced-pom.xml 
b/griffin-streaming-model/dependency-reduced-pom.xml
new file mode 100644
index 0000000..6fb4004
--- /dev/null
+++ b/griffin-streaming-model/dependency-reduced-pom.xml
@@ -0,0 +1,319 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.griffin</groupId>
+  <artifactId>griffin-crawler-model</artifactId>
+  <version>1.0-SNAPSHOT</version>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+        <version>3.2.2</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+              <goal>testCompile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>2.4.3</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <transformers>
+                <transformer>
+                  <mainClass>com.colobu.kafka.ConsumerExample</mainClass>
+                </transformer>
+              </transformers>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_2.10</artifactId>
+      <version>1.6.0</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>avro-mapred</artifactId>
+          <groupId>org.apache.avro</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>chill_2.10</artifactId>
+          <groupId>com.twitter</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>chill-java</artifactId>
+          <groupId>com.twitter</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>xbean-asm5-shaded</artifactId>
+          <groupId>org.apache.xbean</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-client</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>spark-launcher_2.10</artifactId>
+          <groupId>org.apache.spark</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>spark-network-common_2.10</artifactId>
+          <groupId>org.apache.spark</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>spark-network-shuffle_2.10</artifactId>
+          <groupId>org.apache.spark</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>spark-unsafe_2.10</artifactId>
+          <groupId>org.apache.spark</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jets3t</artifactId>
+          <groupId>net.java.dev.jets3t</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>curator-recipes</artifactId>
+          <groupId>org.apache.curator</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>javax.servlet</artifactId>
+          <groupId>org.eclipse.jetty.orbit</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>commons-lang3</artifactId>
+          <groupId>org.apache.commons</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>commons-math3</artifactId>
+          <groupId>org.apache.commons</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jsr305</artifactId>
+          <groupId>com.google.code.findbugs</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jul-to-slf4j</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jcl-over-slf4j</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>slf4j-log4j12</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>compress-lzf</artifactId>
+          <groupId>com.ning</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>RoaringBitmap</artifactId>
+          <groupId>org.roaringbitmap</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>commons-net</artifactId>
+          <groupId>commons-net</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>akka-remote_2.10</artifactId>
+          <groupId>com.typesafe.akka</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>akka-slf4j_2.10</artifactId>
+          <groupId>com.typesafe.akka</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>json4s-jackson_2.10</artifactId>
+          <groupId>org.json4s</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jersey-server</artifactId>
+          <groupId>com.sun.jersey</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jersey-core</artifactId>
+          <groupId>com.sun.jersey</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>mesos</artifactId>
+          <groupId>org.apache.mesos</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>stream</artifactId>
+          <groupId>com.clearspring.analytics</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>metrics-core</artifactId>
+          <groupId>io.dropwizard.metrics</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>metrics-jvm</artifactId>
+          <groupId>io.dropwizard.metrics</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>metrics-json</artifactId>
+          <groupId>io.dropwizard.metrics</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>metrics-graphite</artifactId>
+          <groupId>io.dropwizard.metrics</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>ivy</artifactId>
+          <groupId>org.apache.ivy</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>oro</artifactId>
+          <groupId>oro</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>tachyon-client</artifactId>
+          <groupId>org.tachyonproject</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>pyrolite</artifactId>
+          <groupId>net.razorvine</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>py4j</artifactId>
+          <groupId>net.sf.py4j</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_2.10</artifactId>
+      <version>1.6.0</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-hive_2.10</artifactId>
+      <version>1.6.0</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>parquet-hadoop-bundle</artifactId>
+          <groupId>com.twitter</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>spark-sql_2.10</artifactId>
+          <groupId>org.apache.spark</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hive-exec</artifactId>
+          <groupId>org.spark-project.hive</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hive-metastore</artifactId>
+          <groupId>org.spark-project.hive</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>avro</artifactId>
+          <groupId>org.apache.avro</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>commons-httpclient</artifactId>
+          <groupId>commons-httpclient</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>calcite-avatica</artifactId>
+          <groupId>org.apache.calcite</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>calcite-core</artifactId>
+          <groupId>org.apache.calcite</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>httpclient</artifactId>
+          <groupId>org.apache.httpcomponents</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jackson-mapper-asl</artifactId>
+          <groupId>org.codehaus.jackson</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>commons-codec</artifactId>
+          <groupId>commons-codec</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>joda-time</artifactId>
+          <groupId>joda-time</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jodd-core</artifactId>
+          <groupId>org.jodd</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>datanucleus-core</artifactId>
+          <groupId>org.datanucleus</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>libthrift</artifactId>
+          <groupId>org.apache.thrift</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>libfb303</artifactId>
+          <groupId>org.apache.thrift</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>avro-mapred</artifactId>
+          <groupId>org.apache.avro</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jsr305</artifactId>
+          <groupId>com.google.code.findbugs</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.11</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>hamcrest-core</artifactId>
+          <groupId>org.hamcrest</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_2.10</artifactId>
+      <version>2.2.4</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <properties>
+    <avro.version>1.7.7</avro.version>
+    <scala.binary.version>2.10</scala.binary.version>
+    <jackson.version>2.8.7</jackson.version>
+    <maven.compiler.source>1.8</maven.compiler.source>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <spark.version>1.6.0</spark.version>
+    <maven.compiler.target>1.8</maven.compiler.target>
+  </properties>
+</project>
+

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/pom.xml
----------------------------------------------------------------------
diff --git a/griffin-streaming-model/pom.xml b/griffin-streaming-model/pom.xml
new file mode 100644
index 0000000..ac96542
--- /dev/null
+++ b/griffin-streaming-model/pom.xml
@@ -0,0 +1,187 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.griffin</groupId>
+    <artifactId>griffin-crawler-model</artifactId>
+    <version>1.0-SNAPSHOT</version>
+
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+
+        <spark.version>1.6.0</spark.version>
+        <scala.binary.version>2.10</scala.binary.version>
+        <avro.version>1.7.7</avro.version>
+
+        <jackson.version>2.8.7</jackson.version>
+        <!--<javax.validation.version>1.1.0.Final</javax.validation.version>-->
+        
<!--<hibernate.validator.version>5.4.0.Final</hibernate.validator.version>-->
+        <!--<javax.el.version>2.2.4</javax.el.version>-->
+
+    </properties>
+
+    <dependencies>
+        <!--<dependency>-->
+            <!--<groupId>org.apache.kafka</groupId>-->
+            <!--<artifactId>kafka_${scala.binary.version}</artifactId>-->
+            <!--<version>0.8.2.1</version>-->
+            <!--<exclusions>-->
+                <!--<exclusion>-->
+                    <!--<artifactId>jmxri</artifactId>-->
+                    <!--<groupId>com.sun.jmx</groupId>-->
+                <!--</exclusion>-->
+                <!--<exclusion>-->
+                    <!--<artifactId>jmxtools</artifactId>-->
+                    <!--<groupId>com.sun.jdmk</groupId>-->
+                <!--</exclusion>-->
+                <!--<exclusion>-->
+                    <!--<artifactId>jms</artifactId>-->
+                    <!--<groupId>javax.jms</groupId>-->
+                <!--</exclusion>-->
+                <!--<exclusion>-->
+                    <!--<artifactId>junit</artifactId>-->
+                    <!--<groupId>junit</groupId>-->
+                <!--</exclusion>-->
+            <!--</exclusions>-->
+        <!--</dependency>-->
+        <!--<dependency>-->
+            <!--<groupId>junit</groupId>-->
+            <!--<artifactId>junit</artifactId>-->
+            <!--<version>3.8.1</version>-->
+            <!--<scope>test</scope>-->
+        <!--</dependency>-->
+
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!--<dependency>-->
+        <!--<groupId>org.apache.spark</groupId>-->
+        
<!--<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>-->
+        <!--<version>${spark.version}</version>-->
+        <!--</dependency>-->
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            
<artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.module</groupId>
+            
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scalaj</groupId>
+            <artifactId>scalaj-http_${scala.binary.version}</artifactId>
+            <version>2.3.0</version>
+        </dependency>
+
+        <!--<dependency>-->
+            <!--<groupId>javax.validation</groupId>-->
+            <!--<artifactId>validation-api</artifactId>-->
+            <!--<version>${javax.validation.version}</version>-->
+        <!--</dependency>-->
+        <!--<dependency>-->
+            <!--<groupId>org.hibernate</groupId>-->
+            <!--<artifactId>hibernate-validator</artifactId>-->
+            <!--<version>${hibernate.validator.version}</version>-->
+        <!--</dependency>-->
+        <!--<dependency>-->
+            <!--<groupId>javax.el</groupId>-->
+            <!--<artifactId>javax.el-api</artifactId>-->
+            <!--<version>${javax.el.version}</version>-->
+        <!--</dependency>-->
+        <!--<dependency>-->
+            <!--<groupId>org.glassfish.web</groupId>-->
+            <!--<artifactId>javax.el</artifactId>-->
+            <!--<version>${javax.el.version}</version>-->
+        <!--</dependency>-->
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.binary.version}</artifactId>
+            <version>2.2.4</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>3.2.2</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.4.3</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <transformers>
+                                <transformer
+                                        
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    
<mainClass>com.colobu.kafka.ConsumerExample</mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/resources/config.json
----------------------------------------------------------------------
diff --git a/griffin-streaming-model/src/main/resources/config.json 
b/griffin-streaming-model/src/main/resources/config.json
new file mode 100644
index 0000000..eaa6452
--- /dev/null
+++ b/griffin-streaming-model/src/main/resources/config.json
@@ -0,0 +1,190 @@
+{
+  "spark": {
+    "app.name": "GriffinAccuStreamingApp",
+    "log.level": "INFO",
+    "config": {
+      "spark.task.maxFailures": 5,
+      "spark.streaming.kafka.maxRatePerPartition": 1000,
+      "spark.streaming.concurrentJobs": 4
+    },
+    "streaming.checkpoint.dir": "hdfs:///test/kafka/cp",
+    "streaming.batch.interval.seconds": 5,
+    "streaming.sample.interval.seconds": 5
+  },
+
+  "type.config": {
+    "dq": "accuracy",
+    "dataAsset": {
+      "source": "kafka",
+      "target": "kafka"
+    }
+  },
+
+  "dataAsset": {
+    "source": {
+      "kafka.config": {
+        "bootstrap.servers": "10.9.246.187:9092",
+        "group.id": "group1",
+        "auto.offset.reset": "smallest",
+        "auto.commit.enable": "false"
+      },
+      "topics": "crwaler-in",
+      "pre.process": {
+        "sample": {
+          "need.sample": true,
+          "ratio": 1
+        },
+        "parse": {
+          "in.type": "json",
+          "out.type": "map",
+          "config": {
+            "out.schema": [
+              {
+                "name": "url",
+                "type": "string",
+                "default.value": "",
+                "extract.steps": ["json", ".seeds", "[*]", "json", ".url"]
+              },
+              {
+                "name": "createdts",
+                "type": "bigint",
+                "default.value": "0",
+                "extract.steps": ["json", ".seeds", "[*]", "json", 
".metadata", "json", ".tracker", ".crawlRequestCreateTS"]
+              }
+            ]
+          }
+        }
+      },
+      "dump.config": {
+        "table.name": "cin",
+        "dir": "hdfs:///griffin/streaming/dump",
+        "schema": [
+          {
+            "name": "url",
+            "type": "string",
+            "default.value": ""
+          },
+          {
+            "name": "createdts",
+            "type": "bigint",
+            "default.value": "0"
+          }
+        ]
+      },
+      "schema": [
+        {
+          "name": "url",
+          "type": "string",
+          "default.value": ""
+        },
+        {
+          "name": "createdts",
+          "type": "bigint",
+          "default.value": "0"
+        }
+      ]
+    },
+    "target": {
+      "kafka.config": {
+        "bootstrap.servers": "10.9.246.187:9092",
+        "group.id": "group1",
+        "auto.offset.reset": "smallest",
+        "auto.commit.enable": "false"
+      },
+      "topics": "crawler-out",
+      "pre.process": {
+        "parse": {
+          "in.type": "json",
+          "out.type": "map",
+          "config": {
+            "out.schema": [
+              {
+                "name": "url",
+                "type": "string",
+                "default.value": "",
+                "extract.steps": ["json", ".groups", "[0]", ".attrsList", 
"[.name=URL]", ".values", "[0]"]
+              },
+              {
+                "name": "createdts",
+                "type": "bigint",
+                "default.value": "0",
+                "extract.steps": ["json", ".groups", "[0]", ".attrsList", 
"[.name=CRAWLMETADATA]", ".values", "[0]", "json", ".tracker", 
".crawlRequestCreateTS"]
+              }
+            ]
+          }
+        }
+      },
+      "dump.config": {
+        "table.name": "crawlerout",
+        "dir": "hdfs:///griffin/streaming/dump",
+        "schema": [
+          {
+            "name": "url",
+            "type": "string",
+            "default.value": ""
+          },
+          {
+            "name": "createdts",
+            "type": "bigint",
+            "default.value": "0"
+          }
+        ]
+      },
+      "schema": [
+        {
+          "name": "url",
+          "type": "string",
+          "default.value": ""
+        },
+        {
+          "name": "createdts",
+          "type": "bigint",
+          "default.value": "0"
+        }
+      ]
+    }
+  },
+
+  "dq.config": {
+    "accuracy": {
+      "match.valid.time": {
+        "begin": 0,
+        "end": 24,
+        "unit": "hour"
+      },
+      "mapping": [
+        {
+          "source.name": "url",
+          "target.name": "url",
+          "isPK": true
+        }, {
+          "source.name": "createdts",
+          "target.name": "createdts",
+          "isPK": true
+        }
+      ]
+    }
+  },
+
+  "recorder": {
+    "types": ["hdfs", "post"],
+    "metric.name": "nrt_accuracy_crawler",
+    "config": {
+      "hdfs.dir": "hdfs:///griffin/streaming/record/",
+      "post.url": "http://phxbark4dq-360935.stratus.phx.ebay.com:8080/";,
+      "post.metric.name": "nrt_accuracy_crawler"
+    }
+  },
+
+  "cleaner": {
+
+  },
+
+  "retry": {
+    "need.retry": true,
+    "next.retry.seconds": 300,
+    "interval.seconds": 60
+  }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/resources/kafkaConsumerConfig.properties
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/resources/kafkaConsumerConfig.properties 
b/griffin-streaming-model/src/main/resources/kafkaConsumerConfig.properties
new file mode 100644
index 0000000..b54b5e4
--- /dev/null
+++ b/griffin-streaming-model/src/main/resources/kafkaConsumerConfig.properties
@@ -0,0 +1,2 @@
+bootstrap.servers = 10.9.246.187:9092
+group.id =
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/Application.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/Application.scala 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/Application.scala
new file mode 100644
index 0000000..1ab4df8
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/Application.scala
@@ -0,0 +1,71 @@
+package org.apache.griffin
+
+import org.apache.griffin.algo._
+import org.apache.griffin.config.ConfigFileReader
+import org.apache.griffin.config.params._
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext}
+
+import scala.collection.JavaConverters._
+
+object Application extends Logging {
+//object Application {
+
+  def main(args: Array[String]): Unit = {
+    if (args.length < 1) {
+      logError("Usage: class [<config-files>]")
+      sys.exit(-1)
+    }
+
+    val Array(configFile) = args
+
+    // read config file
+    val configReader = ConfigFileReader(configFile)
+//    val allParam = configReader.readConfig[AllParam]
+    val allParam = configReader.readConfig
+
+    // spark config
+    val sparkParam = allParam.sparkParam
+
+    // type config
+    val typeConfigParam = allParam.typeConfigParam
+    val dqType = DqType.parse(typeConfigParam.dqType)
+    val dataAssetTypeMap = typeConfigParam.dataAssetTypeMap
+
+    // data assets
+    val dataAssetParamMap = allParam.dataAssetParamMap
+
+    // validation of params for algo
+    val validation = dqType match {
+      case AccuracyType => {
+        (dataAssetTypeMap.contains("source") && 
dataAssetParamMap.contains("source")) &&
+          (dataAssetTypeMap.contains("target") && 
dataAssetParamMap.contains("target"))
+      }
+      case _ => (dataAssetTypeMap.contains("source") && 
dataAssetParamMap.contains("source"))
+    }
+    if (!validation) {
+      logError("Config Error!")
+      sys.exit(-1)
+    }
+
+    // choose algo
+//    val algo: Algo = dqType match {
+//      case AccuracyType => {
+//        dataAssetTypeMap.get("source") match {
+//          case Some("kafka") => StreamingAccuracyAlgo(sparkParam, 
dataAssetParamMap.get("source").get, dataAssetParamMap.get("target").get)
+//          case _ => StreamingAccuracyAlgo(sparkParam, 
dataAssetParamMap.get("source").get, dataAssetParamMap.get("target").get) // 
fixme: it's wrong type here
+//        }
+//      }
+//      case _ => {
+//        // fixme: it's wrong type here
+//        StreamingAccuracyAlgo(sparkParam, 
dataAssetParamMap.get("source").get, dataAssetParamMap.get("target").get)
+//      }
+//    }
+    val algo: Algo = StreamingAccuracyAlgo4Crawler(allParam)
+
+    algo.run
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/AccuracyAlgo.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/AccuracyAlgo.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/AccuracyAlgo.scala
new file mode 100644
index 0000000..f09be1b
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/AccuracyAlgo.scala
@@ -0,0 +1,11 @@
+package org.apache.griffin.algo
+
+import org.apache.griffin.config.params._
+
+
+trait AccuracyAlgo extends Algo {
+
+  val sourceDataParam: DataAssetParam
+  val targetDataParam: DataAssetParam
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/Algo.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/Algo.scala 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/Algo.scala
new file mode 100644
index 0000000..d7f51b3
--- /dev/null
+++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/Algo.scala
@@ -0,0 +1,11 @@
+package org.apache.griffin.algo
+
+import org.apache.griffin.config.params._
+
+trait Algo extends Serializable {
+
+  val sparkParam: SparkParam
+
+  def run(): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/StreamingAccuracyAlgo.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/StreamingAccuracyAlgo.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/StreamingAccuracyAlgo.scala
new file mode 100644
index 0000000..f0f2023
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/StreamingAccuracyAlgo.scala
@@ -0,0 +1,27 @@
+package org.apache.griffin.algo
+
+import org.apache.griffin.config.params._
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.{SparkConf, SparkContext}
+
+case class StreamingAccuracyAlgo(val sparkParam: SparkParam,
+                                 val sourceDataParam: DataAssetParam,
+                                 val targetDataParam: DataAssetParam
+                                ) extends AccuracyAlgo {
+
+  def run(): Unit = {
+    // create context
+//    val sparkConf = new SparkConf().setAppName(sparkParam.appName)
+//    sparkConf.setAll(sparkParam.config)
+//
+//    val sc = new SparkContext(sparkConf)
+//    sc.setLogLevel(sparkParam.logLevel)
+//    val sqlContext = new HiveContext(sc)
+//    val ssc = new StreamingContext(sc, Seconds(sparkParam.interval))
+//    ssc.checkpoint(sparkParam.cpDir)
+
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/StreamingAccuracyAlgo4Crawler.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/StreamingAccuracyAlgo4Crawler.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/StreamingAccuracyAlgo4Crawler.scala
new file mode 100644
index 0000000..caa7ad5
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/StreamingAccuracyAlgo4Crawler.scala
@@ -0,0 +1,770 @@
+package org.apache.griffin.algo
+
+import java.util.Date
+
+import kafka.serializer.StringDecoder
+import org.apache.griffin.algo.core.AccuracyCore
+import org.apache.griffin.cache.{CacheProcesser, CachedAccuData}
+import org.apache.griffin.config.params._
+import org.apache.griffin.dump._
+import org.apache.griffin.prep.parse.{DataParser, Json2MapParser}
+import org.apache.griffin.record.{Recorder, RecorderFactory}
+import org.apache.griffin.utils.HdfsUtil
+import org.apache.spark.rdd.{RDD, EmptyRDD}
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.streaming.dstream.InputDStream
+import org.apache.spark.streaming.kafka.KafkaUtils
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.griffin.record.result.{AccuResult, Result}
+
+
+case class StreamingAccuracyAlgo4Crawler( val allParam: AllParam
+                                        ) extends AccuracyAlgo {
+
+  val sparkParam: SparkParam = allParam.sparkParam
+  val sourceDataParam: DataAssetParam = 
allParam.dataAssetParamMap.get("source").get
+  val targetDataParam: DataAssetParam = 
allParam.dataAssetParamMap.get("target").get
+
+  // current dumping hour and minute
+  private var dumpMin: Long = 0L
+
+  // current checked minute
+  private var checkMin: Long = 0L
+
+  // next fire time
+  private var nextFireTime: Long = 0L
+
+
+
+  def run(): Unit = {
+    val ssc = StreamingContext.getOrCreate(sparkParam.cpDir,
+      ( ) => {
+        try {
+          createContext()
+        } catch {
+          case runtime: RuntimeException => {
+            throw runtime
+          }
+        }
+      })
+
+    ssc.start()
+    ssc.awaitTermination()
+    ssc.stop(stopSparkContext=true, stopGracefully=true)
+  }
+
+  def createContext(): StreamingContext = {
+    // create context
+    val sparkConf = new SparkConf().setAppName(sparkParam.appName)
+    sparkConf.setAll(sparkParam.config)
+
+    val sc = new SparkContext(sparkConf)
+    sc.setLogLevel(sparkParam.logLevel)
+    val sqlContext = new HiveContext(sc)
+    val interval = Seconds(sparkParam.batchInterval)
+    val ssc = new StreamingContext(sc, interval)
+    ssc.checkpoint(sparkParam.cpDir)
+
+    // emptyRDD
+    val emptyRdd = sc.emptyRDD[(Product, (Map[String, Any], Map[String, Any]))]
+
+    // source data direct stream
+    val sourceDStream = createDirectStream(ssc, sourceDataParam)
+
+    // target data direct stream
+    val targetDStream = createDirectStream(ssc, targetDataParam)
+
+    // accuracy param
+    val accuracyParam = allParam.dqConfigParam.accuracy
+    val matchValidTime = accuracyParam.matchValidTime
+    val timeRange = (getTimeMs(matchValidTime.begin, matchValidTime.unit), 
getTimeMs(matchValidTime.end, matchValidTime.unit))
+
+    val mapping = accuracyParam.mapping
+    // get mapping Keys and Values
+    val mappingKey = mapping.filter(_.isPK)
+    val mappingValue = mapping
+
+    // retry param
+    val retryParam = allParam.retryParam
+    val nextRetryInterval = retryParam.nextRetryInterval * 1000
+    val retryPollInterval = retryParam.interval * 1000
+    val retryInterval = Seconds(retryParam.interval)
+    val cacheProcesser = CacheProcesser(retryParam)
+
+    // recorder
+    val recorderFactory = RecorderFactory(allParam.recorderParam)
+
+    //--- 1. target part ---
+
+    // target data from kafka (crawler output)
+    val targetDumpConfigParam = targetDataParam.dumpConfigParam
+    val targetPrepParam = targetDataParam.prepParam
+    val targetSchema = targetDataParam.schema
+
+    // target parser
+    val targetParser = Json2MapParser(targetPrepParam.parseParam.configParam)
+
+    // create hive table for target data
+    val targetDumpWrapper = DumpWrapper(List[DumpWrapperInfo]())
+    val targetPartitionDef = List[(String, String)](("hr", "bigint"), ("min", 
"bigint"))
+    val (targetCreateTableSql, targetDumpDir) = 
genCreateTableSql(targetDumpConfigParam, targetDumpWrapper, targetPartitionDef)
+    val targetTableName = targetDumpConfigParam.tableName
+//    val fieldSep = targetDumpConfigParam.fieldSep
+//    val lineSep = targetDumpConfigParam.lineSep
+    val targetFieldSep = ","
+    val targetLineSep = "\n"
+    sqlContext.sql(targetCreateTableSql)
+
+    // target dstream dump process
+    targetDStream.foreachRDD((rdd, time) => {
+      val ms = time.milliseconds
+      val min = getTimeToUnit(ms, "min")
+      val hour = getTimeToUnit(ms, "hour")
+      val partitions = List[(String, Any)](("hr", hour), ("min", min))
+
+      val partitionPath = genPartitionHdfsPath(partitions)
+      val path = s"${targetDumpDir}/${partitionPath}/${ms}"
+
+      // parse each message
+      val recordRdd: RDD[String] = rdd.flatMap { kv =>
+        val msg = kv._2
+
+        // parse rdd msg
+        val res: Seq[Map[String, Any]] = targetParser.parse(msg)
+
+        // the sequence is the same with schema sequence, no need to sort here
+        val records = res.map { r =>
+          r.values.mkString(targetFieldSep)
+        }
+
+        records
+      }
+
+      if (!recordRdd.isEmpty) {
+
+        // save to hdfs
+        HdfsFileDumpUtil.dump(path, recordRdd, targetLineSep)
+
+        // update hive
+        val partitionSql = genAddPartitionSql(targetTableName, partitions)
+        sqlContext.sql(partitionSql)
+      }
+
+      // update dumpHourMin
+      updateDumpMin(min)
+    })
+
+    //--- 2. source part ---
+
+    // source data from kafka (crawler input)
+    val sourceDumpConfigParam = sourceDataParam.dumpConfigParam
+    val sourcePrepParam = sourceDataParam.prepParam
+    val sourceSchema = sourceDataParam.schema
+    val sourceSampleParam = sourcePrepParam.sampleParam
+
+    // source parser
+    val sourceParser = Json2MapParser(sourcePrepParam.parseParam.configParam)
+
+    // create hive table for source data
+    val sourceDumpWrapper = DumpWrapper(List[DumpWrapperInfo](TimeGroupInfo, 
NextFireTimeInfo, MismatchInfo, ErrorInfo))
+    val sourcePartitionDef = List[(String, String)](("firetime", "bigint"))
+    val (sourceCreateTableSql, sourceDumpDir) = 
genCreateTableSql(sourceDumpConfigParam, sourceDumpWrapper, sourcePartitionDef)
+    val sourceTableName = sourceDumpConfigParam.tableName
+    val sourceFieldSep = ","
+    val sourceLineSep = "\n"
+    sqlContext.sql(sourceCreateTableSql)
+
+    // get source dump keys
+    val sourceDumpValueKeys = sourceDumpConfigParam.schema.map(_.name)
+    val sourceDumpInfoKeys = sourceDumpWrapper.wrapKeys
+
+    // source dstream parse and compare process
+    val sampleInterval = Seconds(sparkParam.sampleInterval)
+    sourceDStream.foreachRDD((rdd, time) => {
+      val ms = time.milliseconds
+
+      val batchTime = new Date()
+      println(s"++++++  ${batchTime}  ++++++")
+
+      val curMin = getTimeToUnit(ms, "min")
+
+      // get next fire time
+      val cft = getNextFireTime
+      val (nft, fireNow, updateHive) = if (cft <= 0) {
+        initNextFireTime(ms, nextRetryInterval, retryPollInterval)
+        (getNextFireTime, false, true)
+      } else if (cft <= ms) {
+        updateNextFireTime(nextRetryInterval)
+        (getNextFireTime, true, true)
+      } else (cft, false, false)
+
+      // get check min
+      val icm = getCheckMin
+      val curCheckMin = if (icm <= 0) {
+        updateCheckMin(curMin)
+        getCheckMin
+      } else icm
+
+      // dump path
+      val partitions = List[(String, Any)](("firetime", nft))
+      val partitionPath = genPartitionHdfsPath(partitions)
+      val dumpPath = s"${sourceDumpDir}/${partitionPath}/${ms}"
+      val dumpRetryPath = s"${sourceDumpDir}/${partitionPath}/${ms}.r"
+
+      // update hive
+      if (updateHive) {
+        val partitionSql = genAddPartitionSql(sourceTableName, partitions)
+        sqlContext.sql(partitionSql)
+      }
+
+      // sample time process
+      if (time.isMultipleOf(sampleInterval)) {
+        val recorders = recorderFactory.getRecorders(ms)
+
+        // start
+        recorders.foreach(_.start)
+
+        // sample data
+        val sampleRdd = if (sourceSampleParam.needSample) {
+          sampleFunc(rdd, sourceSampleParam.ratio)
+        } else rdd
+
+        val sampleCount = sampleRdd.count
+        println(s"===== sampleCount: ${sampleCount} =====")
+
+        // parse data
+        if (sampleRdd.isEmpty) {
+          // empty source data, do nothing here
+          recorders.foreach(_.info(ms, "empty source data"))
+        } else {
+          // parse data
+          val parseRdd: RDD[Map[String, Any]] = sampleRdd.flatMap { kv =>
+            val msg = kv._2
+            // parse msg
+            sourceParser.parse(msg)
+          }
+
+          val parseCount = parseRdd.count
+          println(s"===== parseCount: ${parseCount} =====")
+
+          // get source data
+          val sourceRdd: RDD[(Product, Map[String, Any])] = parseRdd.map { row 
=>
+            val keys: List[AnyRef] = mappingKey.map { mp =>
+              row.get(mp.sourceName) match {
+                case Some(v) => v.asInstanceOf[AnyRef]
+                case None => null
+              }
+            }
+            val values: Map[String, Any] = mappingValue.map { mp =>
+              val v = row.get(mp.sourceName) match {
+                case Some(v) => v
+                case None => null
+              }
+              (mp.sourceName, v)
+            }.toMap
+            (toTuple(keys) -> values)
+          }
+          val sourceCount = sourceRdd.count
+
+          println(s"===== first time sourceCount: ${sourceCount} =====")
+
+          // wrap source data
+          val sourceWrappedRdd: RDD[(Product, (Map[String, Any], Map[String, 
Any]))] = sourceRdd.map { r =>
+            val wrappedData = Map[String, Any](
+              (TimeGroupInfo.key -> ms),
+              (NextFireTimeInfo.key -> nft),
+              (MismatchInfo.key -> ""),
+              (ErrorInfo.key -> "")
+            )
+            (r._1, (r._2, wrappedData))
+          }
+
+          val (result, missRdd, matchRdd) = accu(curMin, sourceWrappedRdd)
+
+          val missingRddCount1 = missRdd.count
+          println(s"===== missingRddCount1: ${missingRddCount1} =====")
+
+          // record result
+          recorders.foreach(_.accuracyResult(ms, result))
+
+          val missStrings = missRdd.map { row =>
+            val (key, (value, info)) = row
+            s"${value} [${info.getOrElse(MismatchInfo.key, "unknown")}]"
+          }.collect()
+
+          // record missing records
+          recorders.foreach(_.accuracyMissingRecords(missStrings))
+
+          // cache
+          val cacheData = CachedAccuData()
+          cacheData.result = result
+          cacheData.curTime = ms
+          cacheData.deadTime = timeRange._2 + ms
+          cacheProcesser.cache(ms, cacheData)
+
+          // dump mismatch data to hive
+          if (!missRdd.isEmpty) {
+            // parse each record
+            val dumpMissingRdd: RDD[String] = missRdd.map { kv =>
+              val (_, (dt, inf)) = kv
+              val values = getDumpValues(dt, sourceDumpValueKeys)
+              val infos = getDumpValues(inf, sourceDumpInfoKeys)
+              (values ::: infos).mkString(sourceFieldSep)
+            }
+
+            // save to hdfs
+            HdfsFileDumpUtil.dump(dumpPath, dumpMissingRdd, sourceLineSep)
+
+          }
+
+        }
+
+        // finish
+        recorders.foreach(_.finish)
+
+      }
+
+      // retry
+      if (fireNow) {
+
+        val t1 = new Date().getTime
+        println("===== retrying begin =====")
+
+        // get current path
+        val partitions = List[(String, Any)](("firetime", cft))
+        val partitionPath = genPartitionHdfsPath(partitions)
+        val dumpedDirPath = s"${sourceDumpDir}/${partitionPath}"
+
+        println(s"===== nft: ${nft} =====")
+        println(s"===== cft: ${cft} =====")
+        println(s"===== dumpedDirPath: ${dumpedDirPath} =====")
+
+        val tt1 = new Date().getTime
+
+        // get missing data from hive
+        val partitionsRange = List(("firetime", cft))
+        val selectSql = genSelectSql(sourceTableName, partitionsRange)
+        val missingDataFrame: DataFrame = sqlContext.sql(selectSql)
+
+        val tt2 = new Date().getTime
+        println(s"--------------- get source data from hive time: ${tt2 - tt1} 
---------------")
+
+        // get missing data
+        val missingSourceWrappedRdd: RDD[(Product, (Map[String, Any], 
Map[String, Any]))] = missingDataFrame.map { row =>
+          val keys: List[AnyRef] = mappingKey.map { mp =>
+            row.getAs[Any](mp.sourceName).asInstanceOf[AnyRef]
+          }
+          val values: Map[String, Any] = sourceDumpValueKeys.map { key =>
+            val v = row.getAs[Any](key)
+            (key, v)
+          }.toMap
+          val infos: Map[String, Any] = sourceDumpInfoKeys.map { key =>
+            val v = row.getAs[Any](key)
+            (key, v)
+          }.toMap
+          (toTuple(keys) -> (values, infos))
+        }
+
+        val tt3 = new Date().getTime
+        println(s"--------------- wrap source data time: ${tt3 - tt2} 
---------------")
+
+        if (!missingSourceWrappedRdd.isEmpty) {
+          val sourceCount = missingSourceWrappedRdd.count
+          println(s"===== rerty sourceCount: ${sourceCount} =====")
+
+          val (retryResult, stillMissingRdd, matchRdd) = accu(curCheckMin, 
missingSourceWrappedRdd)
+
+          val tt4 = new Date().getTime
+          println(s"--------------- accu time: ${tt4 - tt3} ---------------")
+
+          val missingRddCount2 = stillMissingRdd.count
+          println(s"===== missingRddCount2: ${missingRddCount2} =====")
+
+          val matchRddCount2 = matchRdd.count
+          println(s"===== matchRddCount2: ${matchRddCount2} =====")
+
+          // filter still need saving rdd
+          val savingRdd: RDD[(Product, (Map[String, Any], Map[String, Any]))] 
= stillMissingRdd.filter { row =>
+            val (key, (value, info)) = row
+            info.get(TimeGroupInfo.key) match {
+              case Some(t: Long) => {
+                cacheProcesser.getCache(t) match {
+                  case Some(cache) => true
+                  case _ => false
+                }
+              }
+              case _ => false
+            }
+          }
+
+          def reorgByTimeGroup(rdd: RDD[(Product, (Map[String, Any], 
Map[String, Any]))]
+                              ): RDD[(Long, (Product, (Map[String, Any], 
Map[String, Any])))] = {
+            rdd.flatMap { row =>
+              val (key, (value, info)) = row
+              val b: Option[(Long, (Product, (Map[String, Any], Map[String, 
Any])))] = info.get(TimeGroupInfo.key) match {
+                case Some(t: Long) => Some((t, row))
+                case _ => None
+              }
+              b
+            }
+          }
+
+          // get missing results
+          val missingResults = reorgByTimeGroup(stillMissingRdd)
+          val missingResultsCount = missingResults.count
+          println(s"===== missingResultsCount: ${missingResultsCount} =====")
+
+          // get matched results
+          val matchResults = reorgByTimeGroup(matchRdd)
+          val matchResultsCount = matchResults.count
+          println(s"===== matchResultsCount: ${matchResultsCount} =====")
+
+          val groupedResults = missingResults.cogroup(matchResults)
+
+          val tt5 = new Date().getTime
+          println(s"--------------- cogroup time: ${tt5 - tt4} 
---------------")
+
+//          val groupedMatchResults: RDD[(Long, Iterable[(Product, 
(Map[String, Any], Map[String, Any]))])] = matchResultsCount.groupByKey()
+
+          val groupedResultsCount = groupedResults.count
+          println(s"===== groupedResultsCount: ${groupedResultsCount} =====")
+
+          groupedResults.foreach(row => println(s"=== ${row._1}: 
${row._2._1.size}, ${row._2._2.size} === ${ms} ==="))
+
+          val retry_time = new Date().getTime
+
+          // record
+          groupedResults.foreach { rslt =>
+            val (t, (missRes, matchRes)) = rslt
+            val curRecorders = recorderFactory.getRecorders(t)
+
+            cacheProcesser.getCache(t) match {
+              case Some(cache) => {
+                val matchCount = matchRes.size
+                // need to update result
+                if (matchCount > 0) {
+                  val accuCache = cache.asInstanceOf[CachedAccuData]
+                  val delta = AccuResult(missRes.size, (missRes.size + 
matchRes.size))
+                  val oldResult = accuCache.result
+                  val newResult = accuCache.result.updateResult(delta)
+
+                  // different result
+                  if (newResult.differsFrom(oldResult)) {
+                    accuCache.result = newResult
+
+                    // record result
+                    curRecorders.foreach(_.accuracyResult(ms, newResult))
+
+                    val missStrings = missRes.map { row =>
+                      val (key, (value, info)) = row
+                      s"${value} [${info.getOrElse(MismatchInfo.key, 
"unknown")}]"
+                    }
+                    // record missing records
+                    curRecorders.foreach(_.accuracyMissingRecords(missStrings))
+
+                    curRecorders.foreach(_.recordTime(ms, retry_time - t1))
+                    println(s"----- record here: [${ms}, (${t}: ${newResult})] 
-----")
+                  }
+                }
+              }
+              case _ => {
+                println(s"=== no cache of ${t}, need to clear ===")
+              }
+            }
+          }
+
+          val savingRddCount = savingRdd.count
+          println(s"===== savingRddCount: ${savingRddCount} =====")
+
+          val tt6 = new Date().getTime
+          println(s"--------------- record and get saving data time: ${tt6 - 
tt5} ---------------")
+
+          // caches refresh
+          cacheProcesser.refresh(ms)
+
+          // dump mismatch data to hive
+          if (!savingRdd.isEmpty) {
+            // parse each record
+            val dumpMissingRdd: RDD[String] = savingRdd.map { kv =>
+              val (_, (dt, inf)) = kv
+              val values = getDumpValues(dt, sourceDumpValueKeys)
+              val infos = getDumpValues(inf, sourceDumpInfoKeys)
+              (values ::: infos).mkString(sourceFieldSep)
+            }
+
+            // save to hdfs
+            HdfsFileDumpUtil.dump(dumpRetryPath, dumpMissingRdd, sourceLineSep)
+
+            val tt7 = new Date().getTime
+            println(s"--------------- dump saving data time: ${tt7 - tt6} 
---------------")
+          }
+        } else {
+          println("===== retry source data empty =====")
+        }
+
+        val tt7 = new Date().getTime
+
+        // remove current calculated dumped datas
+        HdfsUtil.deleteHdfsPath(dumpedDirPath)
+
+        val tt8 = new Date().getTime
+        println(s"--------------- remove outtime data time: ${tt8 - tt7} 
---------------")
+
+        val t2 = new Date().getTime
+        println(s"===== retrying end [using ${t2 - t1} ms] =====")
+      }
+
+
+    })
+
+    def accu(curMin: Long, sourceWrappedRdd: RDD[(Product, (Map[String, Any], 
Map[String, Any]))]
+            ): (AccuResult, RDD[(Product, (Map[String, Any], Map[String, 
Any]))], RDD[(Product, (Map[String, Any], Map[String, Any]))]) = {
+      val t1 = new Date().getTime
+
+      // get dump min
+      val dumpedMin = getDumpedMin
+
+      val mRange = (curMin, dumpedMin)
+      println(s"===== mRange: ${mRange} =====")
+
+      val ret = if (dumpReady(curMin, dumpedMin)) {
+        // get target data from hive
+        val minRange = (curMin, dumpedMin)
+        val partitionsRange = List(("min", minRange))
+        val selectSql = genSelectSql(targetTableName, partitionsRange)
+        val targetDataFrame: DataFrame = sqlContext.sql(selectSql)
+
+        // get target data
+        val targetWrappedRdd: RDD[(Product, (Map[String, Any], Map[String, 
Any]))] = targetDataFrame.map { row =>
+          val keys: List[AnyRef] = mappingKey.map { mp =>
+            row.getAs[Any](mp.targetName).asInstanceOf[AnyRef]
+          }
+          val values: Map[String, Any] = mappingValue.map { mp =>
+            val v = row.getAs[Any](mp.targetName)
+            (mp.targetName, v)
+          }.toMap
+          (toTuple(keys) -> (values, Map[String, Any]()))
+        }
+
+        val targetCount = targetWrappedRdd.count
+        println(s"===== targetCount: ${targetCount} =====")
+
+        val (result, missRdd, matchRdd) = if (targetWrappedRdd.isEmpty) {
+          val sourceCount = sourceWrappedRdd.count
+          (AccuResult(sourceCount, sourceCount), sourceWrappedRdd, emptyRdd)
+        } else {
+          // cogroup source and target
+          val allKvs = sourceWrappedRdd.cogroup(targetWrappedRdd)
+
+          // accuracy algorithm calculation
+          val (accuResult, missingRdd, matchingRdd) = 
AccuracyCore.accuracy(allKvs)
+
+          println("source count: " + accuResult.totalCount + " missed count : 
" + accuResult.missCount)
+
+          val missingCount = missingRdd.count
+          println(s"===== missingCount: ${missingCount} =====")
+
+          (accuResult, missingRdd, matchingRdd)
+        }
+
+        // update check min
+        updateCheckMin(dumpedMin + 1)
+
+        (result, missRdd, matchRdd)
+
+      } else {
+        println("===== dump data not ready =====")
+        val sourceCount = sourceWrappedRdd.count
+        (AccuResult(sourceCount, sourceCount), sourceWrappedRdd, emptyRdd)
+      }
+
+      val t2 = new Date().getTime
+
+      println(s"===== accu using time ${t2 - t1} ms =====")
+
+      ret
+    }
+
+    ssc
+
+  }
+
+  def createDirectStream(ssc: StreamingContext, dataAssetParam: 
DataAssetParam): InputDStream[(String, String)] = {
+    val kafkaParam = dataAssetParam.kafkaConfig
+    val topics = dataAssetParam.topics.split(",").toSet
+    KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](
+      ssc,
+      kafkaParam,
+      topics
+    )
+  }
+
+  def genCreateTableSql(dumpConfigParam: DumpParam, dumpWrapper: DumpWrapper, 
partitionDef: List[(String, String)]): (String, String) = {
+    val tableName = dumpConfigParam.tableName
+    val targetDumpDir = s"${dumpConfigParam.dir}/${tableName}"
+
+    val schema = dumpConfigParam.schema
+    val schemaCols = schema.map { field =>
+      (field.name, field.fieldType)
+    }
+
+    val wrapperCols = dumpWrapper.wrapSchema
+
+    val colsSql = (schemaCols ++ wrapperCols).map(col => (s"`${col._1}` 
${col._2}")).mkString(", ")
+
+//    val partitions = dumpConfigParam.partitions
+//    val partitionCols = partitions.map { field =>
+//      (field.name, field.fieldType)
+//    }.toMap
+//    val partitionsSql = partitionCols.map(prtn => (s"`${prtn._1}` 
${prtn._2}")).mkString(", ")
+    val partitionsSql = partitionDef.map(prtn => (s"`${prtn._1}` 
${prtn._2}")).mkString(", ")
+
+    val sql =
+      s"""CREATE EXTERNAL TABLE IF NOT EXISTS `${tableName}`
+         |(${colsSql}) PARTITIONED BY (${partitionsSql})
+         |ROW FORMAT DELIMITED
+         |FIELDS TERMINATED BY ','
+         |LINES TERMINATED BY '\n'
+         |STORED AS TEXTFILE
+         |LOCATION '${targetDumpDir}'""".stripMargin
+    (sql, targetDumpDir)
+  }
+
+
+  def genPartitionHdfsPath(partitions: List[(String, Any)]): String = {
+    partitions.map(prtn => s"${prtn._1}=${prtn._2}").mkString("/")
+  }
+
+  def genAddPartitionSql(tableName: String, partitions: List[(String, Any)]): 
String = {
+    val partitionSql = partitions.map(prtn => 
(s"`${prtn._1}`=${prtn._2}")).mkString(", ")
+    val sql = s"""ALTER TABLE `${tableName}` ADD IF NOT EXISTS PARTITION 
(${partitionSql})"""
+    sql
+  }
+
+  def genDropPartitionSql(tableName: String, partitions: List[(String, Any)]): 
String = {
+    val partitionSql = partitions.map(prtn => 
(s"`${prtn._1}`=${prtn._2}")).mkString(", ")
+    val sql = s"""ALTER TABLE `${tableName}` DROP IF EXISTS PARTITION 
(${partitionSql})"""
+    sql
+  }
+
+  def genSelectSql(tableName: String, partitionsRange: List[(String, Any)]): 
String = {
+    val whereClause = partitionsRange match {
+      case Nil => ""
+      case _ => {
+        val clause = partitionsRange.map { prtn =>
+          prtn match {
+            case (name, range: (Any, Any)) => s"""`${name}` BETWEEN 
'${range._1}' and '${range._2}'"""
+            case (name, value) => s"""`${name}`='${value}'"""
+            case _ => ""
+          }
+        }.filter(_.nonEmpty).mkString(" AND ")
+        s"WHERE ${clause}"
+      }
+    }
+    val sql = s"""SELECT * FROM `${tableName}` ${whereClause}"""
+    sql
+  }
+
+  def sampleFunc[T](src: RDD[T], ratio: Double, count: Int = 0): RDD[T] = {
+    if (src.isEmpty) {
+      println("===== empty source =====")
+      src
+    } else {
+      val sample = src.sample(false, ratio, System.nanoTime)
+      if (sample.isEmpty) {
+        if (count < 5) {
+          sampleFunc(src, ratio, count + 1)
+        } else {
+          println("===== empty sample, get source =====")
+          src
+        }
+      } else sample
+    }
+  }
+
+
+
+  def getTimeMs(t: Int, unit: String): Long = {
+    val lt = t.toLong
+    unit match {
+      case "ms" => lt
+      case "sec" => lt * 1000
+      case "min" => lt * 60 * 1000
+      case "hour" => lt * 60 * 60 * 1000
+      case "day" => lt * 24 * 60 * 60 * 1000
+      case _ => lt * 60 * 1000
+    }
+  }
+
+  def getTimeToUnit(lt: Long, unit: String): Long = {
+    unit match {
+      case "ms" => lt
+      case "sec" => lt / 1000
+      case "min" => lt / (60 * 1000)
+      case "hour" => lt / (60 * 60 * 1000)
+      case "day" => lt / (24 * 60 * 60 * 1000)
+      case _ => lt / (60 * 1000)
+    }
+  }
+
+  def updateDumpMin(min: Long): Unit = {
+    this.dumpMin = min
+  }
+
+  def getDumpedMin(): Long = {
+    this.dumpMin - 1
+  }
+
+  def dumpReady(beginMin: Long, dumpedMin: Long): Boolean = {
+    beginMin <= dumpedMin
+  }
+
+
+  def toTuple[A <: AnyRef](as: Seq[A]): Product = {
+    val tupleClass = Class.forName("scala.Tuple" + as.size)
+    tupleClass.getConstructors.apply(0).newInstance(as: 
_*).asInstanceOf[Product]
+  }
+
+  def initNextFireTime(t: Long, nextInterval: Long, precInterval: Long): Unit 
= {
+    val nt = nextInterval + t
+    this.nextFireTime = nt / precInterval * precInterval
+  }
+
+  def updateNextFireTime(nextInterval: Long): Unit = {
+    this.nextFireTime += nextInterval
+  }
+
+  def getNextFireTime(): Long = {
+    this.nextFireTime
+  }
+
+  def getDumpValues(data: Map[String, Any], keys: List[String]): List[String] 
= {
+    keys.map { k =>
+      data.get(k) match {
+        case Some(v) => if (v != null) v.toString else ""
+        case _ => ""
+      }
+    }
+  }
+
+  def getStringFromSchema(datas: (Map[String, Any], Map[String, Any]), 
dumpConfigParam: DumpParam, dumpWrapper: DumpWrapper): List[Option[_]] = {
+    val (dt, info) = datas
+    val schema = dumpConfigParam.schema
+    val schemaValues = schema.map { s =>
+      dt.get(s.name)
+    }
+    val infoValues = dumpWrapper.wrapValues(info)
+    schemaValues ::: infoValues
+  }
+
+  def getCheckMin(): Long = {
+    this.checkMin
+  }
+
+  def updateCheckMin(min: Long): Unit = {
+    this.checkMin = min
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/core/AccuracyCore.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/core/AccuracyCore.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/core/AccuracyCore.scala
new file mode 100644
index 0000000..b7a7c8e
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/core/AccuracyCore.scala
@@ -0,0 +1,92 @@
+package org.apache.griffin.algo.core
+
+import org.apache.griffin.dump._
+import org.apache.griffin.record.result.AccuResult
+import org.apache.spark.rdd.RDD
+
+
+object AccuracyCore {
+
+  type T = Map[String, Any]
+
+  def accuracy(allkvs: RDD[(Product, (Iterable[(T, T)], Iterable[(T, T)]))]
+              ): (AccuResult, RDD[(Product, (T, T))], RDD[(Product, (T, T))]) 
= {
+    val result: RDD[(Long, Long, List[(Product, (T, T))], List[(Product, (T, 
T))])] = allkvs.map { kv =>
+      val (k, (srcData, tgtData)) = kv
+//      val srcCount = srcData.size
+      val tgtCount = tgtData.size
+      val tgt: Set[T] = tgtData.map(_._1).toSet   // only get value
+
+      // result: (missCount, matchCount, missDataList, matchDataList)
+      val rslt = srcData.foldLeft((0L, 0L, List[(Product, (T, T))](), 
List[(Product, (T, T))]())) { (r, mapPair) =>
+        val (valueMap, infoMap) = mapPair
+        if (tgt.contains(valueMap)) {
+          val newInfoMap = infoMap + (MismatchInfo.key -> "matched")
+          val matchItem = (k, (valueMap, newInfoMap))
+          (r._1, r._2 + 1, r._3, r._4 :+ matchItem)
+        } else {
+          val newInfoMap = if (tgtCount == 0) {
+            infoMap + (MismatchInfo.key -> "no target")
+          } else if (tgtCount == 1) {
+            infoMap + (MismatchInfo.key -> ("target data: " + 
tgt.head.toString))
+          } else {
+            infoMap + (MismatchInfo.key -> "not match")
+          }
+          val missItem = (k, (valueMap, newInfoMap))
+          (r._1 + 1, r._2, r._3 :+ missItem, r._4)
+        }
+      }
+      rslt
+    }
+
+    val missRdd = result.flatMap(_._3)
+    val matchRdd = result.flatMap(_._4)
+
+    def seq(cnt: (Long, Long), rcd: (Long, Long, Any, Any)): (Long, Long) = {
+      (cnt._1 + rcd._1, cnt._2 + rcd._2)
+    }
+    def comb(c1: (Long, Long), c2: (Long, Long)): (Long, Long) = {
+      (c1._1 + c2._1, c1._2 + c2._2)
+    }
+    val countPair = result.aggregate((0L, 0L))(seq, comb)
+
+    (AccuResult(countPair._1, (countPair._1 + countPair._2)), missRdd, 
matchRdd)
+  }
+
+//  def accuracy(allkvs: RDD[(Product, (Iterable[Map[String, Any]], 
Iterable[Map[String, Any]]))]): (AccuResult, List[String], RDD[(Product, 
Map[String, Any])]) = {
+//    val result: RDD[((Long, Long), List[String], List[(Product, Map[String, 
Any])])] = allkvs.map { kv =>
+//      val (k, (srcData, tgtData)) = kv
+//      val srcCount = srcData.size
+//      val tgtCount = tgtData.size
+//      val tgt = tgtData.toSet
+//      val rslt = srcData.foldLeft((0L, List[String](), List[(Product, 
Map[String, Any])]())) { (r, mp) =>
+//        if (tgt.contains(mp)) {
+//          r
+//        } else {
+//          val mp1 = if (tgtCount == 0) {
+//            mp + ("_error_" -> "no target")
+//          } else if (tgtCount == 1) {
+//            mp + ("_error_" -> tgt.head.toString)
+//          } else {
+//            mp + ("_error_" -> "not match")
+//          }
+//          (r._1 + 1, mp1.toString :: r._2, (k, mp) :: r._3)
+//        }
+//      }
+//      ((rslt._1, srcCount.toLong), rslt._2, rslt._3)
+//    }
+//
+//    val missingRdd = result.flatMap(_._3)
+//
+//    def seq(cnt: (Long, Long, List[String]), rcd: ((Long, Long), 
List[String], Any)): (Long, Long, List[String]) = {
+//      (cnt._1 + rcd._1._1, cnt._2 + rcd._1._2, cnt._3 ::: rcd._2)
+//    }
+//    def comb(c1: (Long, Long, List[String]), c2: (Long, Long, 
List[String])): (Long, Long, List[String]) = {
+//      (c1._1 + c2._1, c1._2 + c2._2, c1._3 ::: c2._3)
+//    }
+//    val missingCount = result.aggregate((0L, 0L, List[String]()))(seq, comb)
+//
+//    (AccuResult(missingCount._1, missingCount._2), missingCount._3, 
missingRdd)
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CacheProcesser.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CacheProcesser.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CacheProcesser.scala
new file mode 100644
index 0000000..c70ce77
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CacheProcesser.scala
@@ -0,0 +1,54 @@
+package org.apache.griffin.cache
+
+import org.apache.griffin.config.params.RetryParam
+
+import scala.collection.mutable.{Map => MutableMap}
+
+case class CacheProcesser(retryParam: RetryParam) {
+
+  val cacheGroup: MutableMap[Long, CachedData] = MutableMap()
+
+  val needRetry = retryParam.needRetry
+  val nextRetryInterval = retryParam.nextRetryInterval * 1000
+
+  def cache(timeStamp: Long, cacheData: CachedData): Unit = {
+    if (!cacheData.cacheEnd) {
+      cacheGroup.get(timeStamp) match {
+        case Some(cd) => {
+          println("unnecessary to cache")
+        }
+        case _ => {
+          cacheGroup += (timeStamp -> cacheData)
+          println(s"add cache ${timeStamp}: ${cacheData.result}")
+        }
+      }
+    } else {
+      println("unnecessary to cache")
+    }
+  }
+
+//  def process(time: Long): Unit = {
+//    val curCacheGroup = cacheGroup.toMap
+//    val fireCaches = curCacheGroup.filter { pair => (time >= 
pair._2.nextFireTime) }
+//    println(s"=== retry group count: ${fireCaches.size} ===")
+//    fireCaches.foreach { pair =>
+//      val (_, cacheData) = pair
+//      println(s"=== retry: ${pair._1} ===")
+//      cacheData.recalcAndCache(time)
+//      cacheData.nextFireTime = time + nextRetryInterval
+//    }
+//  }
+
+  def refresh(t: Long): Unit = {
+    val curCacheGroup = cacheGroup.toMap
+    curCacheGroup.foreach(_._2.curTime = t)
+    val deadCache = curCacheGroup.filter(_._2.cacheEnd)
+    println(s"=== dead cache group count: ${deadCache.size} ===")
+    deadCache.keySet.foreach(cacheGroup -= _)
+  }
+
+  def getCache(timeStamp: Long): Option[CachedData] = {
+    cacheGroup.get(timeStamp)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CachedAccuData.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CachedAccuData.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CachedAccuData.scala
new file mode 100644
index 0000000..0d939e5
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CachedAccuData.scala
@@ -0,0 +1,23 @@
+package org.apache.griffin.cache
+
+import org.apache.spark.rdd.RDD
+import org.apache.griffin.record.result.AccuResult
+
+case class CachedAccuData() extends CachedData {
+
+  type T = AccuResult
+
+//  var validMinRange: (Long, Long) = _
+
+//  var proc: (Long) => (T, RDD[(Product, Map[String, Any])], (Long, Long)) => 
(T, RDD[(Product, Map[String, Any])], (Long, Long)) = _
+
+//  def recalcAndCache(t: Long): Unit = {
+//    val res = proc(t)(result, rdd, validMinRange)
+//    unpersist
+//    result = res._1
+//    rdd = res._2
+//    validMinRange = res._3
+//    persist
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CachedData.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CachedData.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CachedData.scala
new file mode 100644
index 0000000..64474f8
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CachedData.scala
@@ -0,0 +1,27 @@
+package org.apache.griffin.cache
+
+import org.apache.spark.rdd.RDD
+import org.apache.griffin.record.result.Result
+
+
+trait CachedData {
+
+  type T <: Result
+
+  var result: T = _
+
+  var curTime: Long = _
+  var deadTime: Long = _
+//  var rdd: RDD[(Product, Map[String, Any])] = _
+
+//  var nextFireTime: Long = _
+
+//  def persist(): Unit = { rdd.cache }
+//  def unpersist(): Unit = { rdd.unpersist() }
+
+//  def recalcAndCache(t: Long): Unit
+  def cacheEnd(): Boolean = {
+    (curTime > deadTime) || (result != null && !result.needCalc)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/ConfigFileReader.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/ConfigFileReader.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/ConfigFileReader.scala
new file mode 100644
index 0000000..f7e6fdf
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/ConfigFileReader.scala
@@ -0,0 +1,20 @@
+package org.apache.griffin.config
+
+import org.apache.griffin.config.params._
+import org.apache.griffin.utils.JsonUtil
+
+case class ConfigFileReader(file: String) extends ConfigReader {
+
+//  def readConfig[T <: ConfigParam](implicit m : Manifest[T]): T = {
+//    val lines = scala.io.Source.fromFile(file).mkString
+//    val param = JsonUtil.fromJson[T](lines)
+//    param
+//  }
+
+  def readConfig: AllParam = {
+    val lines = scala.io.Source.fromFile(file).mkString
+    val param = JsonUtil.fromJson2AllParam(lines)
+    param
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/ConfigReader.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/ConfigReader.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/ConfigReader.scala
new file mode 100644
index 0000000..5d87976
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/ConfigReader.scala
@@ -0,0 +1,11 @@
+package org.apache.griffin.config
+
+import org.apache.griffin.config.params.ConfigParam
+
+trait ConfigReader extends Serializable {
+
+//  def readConfig[T <: ConfigParam](implicit m : Manifest[T]): T
+
+  def readConfig: ConfigParam
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/AccuracyParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/AccuracyParam.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/AccuracyParam.scala
new file mode 100644
index 0000000..f4a263d
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/AccuracyParam.scala
@@ -0,0 +1,11 @@
+package org.apache.griffin.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+@JsonInclude(Include.NON_NULL)
+case class AccuracyParam( @JsonProperty("mapping") mapping: List[MappingParam],
+                          @JsonProperty("match.valid.time") matchValidTime: 
TimeRangeParam
+                        ) extends ConfigParam {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/AllParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/AllParam.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/AllParam.scala
new file mode 100644
index 0000000..7c7be6d
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/AllParam.scala
@@ -0,0 +1,15 @@
+package org.apache.griffin.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+@JsonInclude(Include.NON_NULL)
+case class AllParam( @JsonProperty("spark") sparkParam: SparkParam,
+                     @JsonProperty("type.config") typeConfigParam: 
TypeConfigParam,
+                     @JsonProperty("dataAsset") dataAssetParamMap: Map[String, 
DataAssetParam],
+                     @JsonProperty("dq.config") dqConfigParam: DqConfigParam,
+                     @JsonProperty("recorder") recorderParam: RecorderParam,
+                     @JsonProperty("retry") retryParam: RetryParam
+                   ) extends ConfigParam {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ConfigParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ConfigParam.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ConfigParam.scala
new file mode 100644
index 0000000..f97d071
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ConfigParam.scala
@@ -0,0 +1,10 @@
+package org.apache.griffin.config.params
+
+
+trait ConfigParam extends Serializable {
+
+  def merge(cp: ConfigParam): Unit = {}
+
+  def validate(): Boolean = true
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DataAssetParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DataAssetParam.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DataAssetParam.scala
new file mode 100644
index 0000000..3647d8d
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DataAssetParam.scala
@@ -0,0 +1,14 @@
+package org.apache.griffin.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+@JsonInclude(Include.NON_NULL)
+case class DataAssetParam( @JsonProperty("kafka.config") kafkaConfig: 
Map[String, String],
+                           @JsonProperty("topics") topics: String,
+                           @JsonProperty("pre.process") prepParam: PrepParam,
+                           @JsonProperty("dump.config") dumpConfigParam: 
DumpParam,
+                           @JsonProperty("schema") schema: 
List[SchemaFieldParam]
+                         ) extends ConfigParam {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DqConfigParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DqConfigParam.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DqConfigParam.scala
new file mode 100644
index 0000000..789bbb8
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DqConfigParam.scala
@@ -0,0 +1,10 @@
+package org.apache.griffin.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+@JsonInclude(Include.NON_NULL)
+case class DqConfigParam( @JsonProperty("accuracy") accuracy: AccuracyParam
+                        ) extends ConfigParam {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DqType.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DqType.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DqType.scala
new file mode 100644
index 0000000..c524614
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DqType.scala
@@ -0,0 +1,11 @@
+package org.apache.griffin.config.params
+
+sealed class DqType(val tp: String)
+
+case object AccuracyType extends DqType("accuracy")
+case object ProfileType extends DqType("profile")
+case object ValidityType extends DqType("validity")
+
+object DqType {
+  def parse(tp: String): DqType = new DqType(tp)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DumpParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DumpParam.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DumpParam.scala
new file mode 100644
index 0000000..bea1ea6
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DumpParam.scala
@@ -0,0 +1,14 @@
+package org.apache.griffin.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+@JsonInclude(Include.NON_NULL)
+case class DumpParam( @JsonProperty("dir") dir: String,
+//                      @JsonProperty("field.separator") fieldSep: String,
+//                      @JsonProperty("line.separator") lineSep: String,
+                      @JsonProperty("table.name") tableName: String,
+                      @JsonProperty("schema") schema: List[SchemaFieldParam]
+                    ) extends ConfigParam {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/MappingParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/MappingParam.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/MappingParam.scala
new file mode 100644
index 0000000..80f94f4
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/MappingParam.scala
@@ -0,0 +1,12 @@
+package org.apache.griffin.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+@JsonInclude(Include.NON_NULL)
+case class MappingParam( @JsonProperty("source.name") sourceName: String,
+                         @JsonProperty("target.name") targetName: String,
+                         @JsonProperty("isPK") isPK: Boolean
+                         ) extends ConfigParam {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ParseConfigParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ParseConfigParam.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ParseConfigParam.scala
new file mode 100644
index 0000000..bf21d20
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ParseConfigParam.scala
@@ -0,0 +1,10 @@
+package org.apache.griffin.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+@JsonInclude(Include.NON_NULL)
+case class ParseConfigParam( @JsonProperty("out.schema") schema: 
List[SchemaFieldParam]
+                           ) extends ConfigParam {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ParseParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ParseParam.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ParseParam.scala
new file mode 100644
index 0000000..d7d3d22
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ParseParam.scala
@@ -0,0 +1,12 @@
+package org.apache.griffin.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+@JsonInclude(Include.NON_NULL)
+case class ParseParam( @JsonProperty("in.type") inType: String,
+                       @JsonProperty("out.type") outType: String,
+                       @JsonProperty("config") configParam: ParseConfigParam
+                     ) extends ConfigParam {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/PrepParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/PrepParam.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/PrepParam.scala
new file mode 100644
index 0000000..d8a186d
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/PrepParam.scala
@@ -0,0 +1,11 @@
+package org.apache.griffin.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+@JsonInclude(Include.NON_NULL)
+case class PrepParam( @JsonProperty("sample") sampleParam: SampleParam,
+                      @JsonProperty("parse") parseParam: ParseParam
+                    ) extends ConfigParam {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/RecorderParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/RecorderParam.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/RecorderParam.scala
new file mode 100644
index 0000000..e407a21
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/RecorderParam.scala
@@ -0,0 +1,12 @@
+package org.apache.griffin.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+@JsonInclude(Include.NON_NULL)
+case class RecorderParam( @JsonProperty("types") types: List[String],
+                          @JsonProperty("metric.name") metricName: String,
+                          @JsonProperty("config") config: Map[String, String]
+                        ) extends ConfigParam {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/RetryParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/RetryParam.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/RetryParam.scala
new file mode 100644
index 0000000..46ebe07
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/RetryParam.scala
@@ -0,0 +1,12 @@
+package org.apache.griffin.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+@JsonInclude(Include.NON_NULL)
+case class RetryParam( @JsonProperty("need.retry") needRetry: Boolean,
+                       @JsonProperty("next.retry.seconds") nextRetryInterval: 
Long,
+                       @JsonProperty("interval.seconds") interval: Long
+                     ) extends ConfigParam {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SampleParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SampleParam.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SampleParam.scala
new file mode 100644
index 0000000..3749702
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SampleParam.scala
@@ -0,0 +1,11 @@
+package org.apache.griffin.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+@JsonInclude(Include.NON_NULL)
+case class SampleParam( @JsonProperty("need.sample") needSample: Boolean,
+                        @JsonProperty("ratio") ratio: Double
+                      ) extends ConfigParam {
+
+}


Reply via email to