[GRIFFIN-6] Push griffin streaming measure Push griffin streaming measure for a user case.
Author: Liu <[email protected]> Closes #10 from bhlx3lyx7/GRIFFIN-6. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/f783cb78 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/f783cb78 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/f783cb78 Branch: refs/heads/master Commit: f783cb78ed0949e3e7d166ebcc382233475e2529 Parents: 87fd749 Author: Liu <[email protected]> Authored: Thu Mar 30 16:11:09 2017 +0800 Committer: William Guo <[email protected]> Committed: Thu Mar 30 16:11:09 2017 +0800 ---------------------------------------------------------------------- .../dependency-reduced-pom.xml | 319 ++++++++ griffin-streaming-model/pom.xml | 187 +++++ .../src/main/resources/config.json | 190 +++++ .../resources/kafkaConsumerConfig.properties | 2 + .../scala/org/apache/griffin/Application.scala | 71 ++ .../org/apache/griffin/algo/AccuracyAlgo.scala | 11 + .../scala/org/apache/griffin/algo/Algo.scala | 11 + .../griffin/algo/StreamingAccuracyAlgo.scala | 27 + .../algo/StreamingAccuracyAlgo4Crawler.scala | 770 +++++++++++++++++++ .../apache/griffin/algo/core/AccuracyCore.scala | 92 +++ .../apache/griffin/cache/CacheProcesser.scala | 54 ++ .../apache/griffin/cache/CachedAccuData.scala | 23 + .../org/apache/griffin/cache/CachedData.scala | 27 + .../griffin/config/ConfigFileReader.scala | 20 + .../apache/griffin/config/ConfigReader.scala | 11 + .../griffin/config/params/AccuracyParam.scala | 11 + .../apache/griffin/config/params/AllParam.scala | 15 + .../griffin/config/params/ConfigParam.scala | 10 + .../griffin/config/params/DataAssetParam.scala | 14 + .../griffin/config/params/DqConfigParam.scala | 10 + .../apache/griffin/config/params/DqType.scala | 11 + .../griffin/config/params/DumpParam.scala | 14 + .../griffin/config/params/MappingParam.scala | 12 + .../config/params/ParseConfigParam.scala | 10 + .../griffin/config/params/ParseParam.scala | 12 + .../griffin/config/params/PrepParam.scala | 11 + .../griffin/config/params/RecorderParam.scala | 12 + .../griffin/config/params/RetryParam.scala | 12 + .../griffin/config/params/SampleParam.scala | 11 + .../config/params/SchemaFieldParam.scala | 20 + .../griffin/config/params/SparkParam.scala | 15 + .../griffin/config/params/TimeRangeParam.scala | 12 + .../griffin/config/params/TypeConfigParam.scala | 11 + .../org/apache/griffin/dump/DumpWrapper.scala | 21 + .../apache/griffin/dump/DumpWrapperInfo.scala | 32 + .../apache/griffin/dump/HdfsFileDumpUtil.scala | 37 + .../apache/griffin/prep/parse/DataParser.scala | 10 + .../griffin/prep/parse/Json2MapParser.scala | 59 ++ .../apache/griffin/record/HdfsRecorder.scala | 87 +++ .../apache/griffin/record/NullRecorder.scala | 20 + .../apache/griffin/record/PostRecorder.scala | 46 ++ .../org/apache/griffin/record/Recorder.scala | 19 + .../apache/griffin/record/RecorderFactory.scala | 48 ++ .../griffin/record/cleaner/CleanerFactory.scala | 31 + .../griffin/record/cleaner/HdfsCleaner.scala | 78 ++ .../griffin/record/result/AccuResult.scala | 18 + .../apache/griffin/record/result/Result.scala | 12 + .../org/apache/griffin/utils/DataTypeUtil.scala | 179 +++++ .../apache/griffin/utils/ExtractJsonUtil.scala | 205 +++++ .../org/apache/griffin/utils/HdfsUtil.scala | 57 ++ .../org/apache/griffin/utils/JsonUtil.scala | 33 + .../org/apache/griffin/utils/RestfulUtil.scala | 16 + .../griffin/utils/ValueListCombineUtil.scala | 198 +++++ .../src/test/resources/config.json | 11 + .../src/test/resources/input.msg | 1 + .../src/test/resources/out1.msg | 1 + .../src/test/resources/output.msg | 1 + .../griffin/config/ConfigFileReaderTest.scala | 21 + .../apache/griffin/config/ConfigReadTest.scala | 45 ++ .../griffin/prep/parse/Json2MapParserTest.scala | 49 ++ .../org/apache/griffin/test/DataParseTest.scala | 101 +++ 61 files changed, 3474 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/dependency-reduced-pom.xml ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/dependency-reduced-pom.xml b/griffin-streaming-model/dependency-reduced-pom.xml new file mode 100644 index 0000000..6fb4004 --- /dev/null +++ b/griffin-streaming-model/dependency-reduced-pom.xml @@ -0,0 +1,319 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.griffin</groupId> + <artifactId>griffin-crawler-model</artifactId> + <version>1.0-SNAPSHOT</version> + <build> + <plugins> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.2.2</version> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.3</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer> + <mainClass>com.colobu.kafka.ConsumerExample</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.10</artifactId> + <version>1.6.0</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>avro-mapred</artifactId> + <groupId>org.apache.avro</groupId> + </exclusion> + <exclusion> + <artifactId>chill_2.10</artifactId> + <groupId>com.twitter</groupId> + </exclusion> + <exclusion> + <artifactId>chill-java</artifactId> + <groupId>com.twitter</groupId> + </exclusion> + <exclusion> + <artifactId>xbean-asm5-shaded</artifactId> + <groupId>org.apache.xbean</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-client</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>spark-launcher_2.10</artifactId> + <groupId>org.apache.spark</groupId> + </exclusion> + <exclusion> + <artifactId>spark-network-common_2.10</artifactId> + <groupId>org.apache.spark</groupId> + </exclusion> + <exclusion> + <artifactId>spark-network-shuffle_2.10</artifactId> + <groupId>org.apache.spark</groupId> + </exclusion> + <exclusion> + <artifactId>spark-unsafe_2.10</artifactId> + <groupId>org.apache.spark</groupId> + </exclusion> + <exclusion> + <artifactId>jets3t</artifactId> + <groupId>net.java.dev.jets3t</groupId> + </exclusion> + <exclusion> + <artifactId>curator-recipes</artifactId> + <groupId>org.apache.curator</groupId> + </exclusion> + <exclusion> + <artifactId>javax.servlet</artifactId> + <groupId>org.eclipse.jetty.orbit</groupId> + </exclusion> + <exclusion> + <artifactId>commons-lang3</artifactId> + <groupId>org.apache.commons</groupId> + </exclusion> + <exclusion> + <artifactId>commons-math3</artifactId> + <groupId>org.apache.commons</groupId> + </exclusion> + <exclusion> + <artifactId>jsr305</artifactId> + <groupId>com.google.code.findbugs</groupId> + </exclusion> + <exclusion> + <artifactId>jul-to-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>jcl-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>compress-lzf</artifactId> + <groupId>com.ning</groupId> + </exclusion> + <exclusion> + <artifactId>RoaringBitmap</artifactId> + <groupId>org.roaringbitmap</groupId> + </exclusion> + <exclusion> + <artifactId>commons-net</artifactId> + <groupId>commons-net</groupId> + </exclusion> + <exclusion> + <artifactId>akka-remote_2.10</artifactId> + <groupId>com.typesafe.akka</groupId> + </exclusion> + <exclusion> + <artifactId>akka-slf4j_2.10</artifactId> + <groupId>com.typesafe.akka</groupId> + </exclusion> + <exclusion> + <artifactId>json4s-jackson_2.10</artifactId> + <groupId>org.json4s</groupId> + </exclusion> + <exclusion> + <artifactId>jersey-server</artifactId> + <groupId>com.sun.jersey</groupId> + </exclusion> + <exclusion> + <artifactId>jersey-core</artifactId> + <groupId>com.sun.jersey</groupId> + </exclusion> + <exclusion> + <artifactId>mesos</artifactId> + <groupId>org.apache.mesos</groupId> + </exclusion> + <exclusion> + <artifactId>netty-all</artifactId> + <groupId>io.netty</groupId> + </exclusion> + <exclusion> + <artifactId>stream</artifactId> + <groupId>com.clearspring.analytics</groupId> + </exclusion> + <exclusion> + <artifactId>metrics-core</artifactId> + <groupId>io.dropwizard.metrics</groupId> + </exclusion> + <exclusion> + <artifactId>metrics-jvm</artifactId> + <groupId>io.dropwizard.metrics</groupId> + </exclusion> + <exclusion> + <artifactId>metrics-json</artifactId> + <groupId>io.dropwizard.metrics</groupId> + </exclusion> + <exclusion> + <artifactId>metrics-graphite</artifactId> + <groupId>io.dropwizard.metrics</groupId> + </exclusion> + <exclusion> + <artifactId>ivy</artifactId> + <groupId>org.apache.ivy</groupId> + </exclusion> + <exclusion> + <artifactId>oro</artifactId> + <groupId>oro</groupId> + </exclusion> + <exclusion> + <artifactId>tachyon-client</artifactId> + <groupId>org.tachyonproject</groupId> + </exclusion> + <exclusion> + <artifactId>pyrolite</artifactId> + <groupId>net.razorvine</groupId> + </exclusion> + <exclusion> + <artifactId>py4j</artifactId> + <groupId>net.sf.py4j</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_2.10</artifactId> + <version>1.6.0</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-hive_2.10</artifactId> + <version>1.6.0</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>parquet-hadoop-bundle</artifactId> + <groupId>com.twitter</groupId> + </exclusion> + <exclusion> + <artifactId>spark-sql_2.10</artifactId> + <groupId>org.apache.spark</groupId> + </exclusion> + <exclusion> + <artifactId>hive-exec</artifactId> + <groupId>org.spark-project.hive</groupId> + </exclusion> + <exclusion> + <artifactId>hive-metastore</artifactId> + <groupId>org.spark-project.hive</groupId> + </exclusion> + <exclusion> + <artifactId>avro</artifactId> + <groupId>org.apache.avro</groupId> + </exclusion> + <exclusion> + <artifactId>commons-httpclient</artifactId> + <groupId>commons-httpclient</groupId> + </exclusion> + <exclusion> + <artifactId>calcite-avatica</artifactId> + <groupId>org.apache.calcite</groupId> + </exclusion> + <exclusion> + <artifactId>calcite-core</artifactId> + <groupId>org.apache.calcite</groupId> + </exclusion> + <exclusion> + <artifactId>httpclient</artifactId> + <groupId>org.apache.httpcomponents</groupId> + </exclusion> + <exclusion> + <artifactId>jackson-mapper-asl</artifactId> + <groupId>org.codehaus.jackson</groupId> + </exclusion> + <exclusion> + <artifactId>commons-codec</artifactId> + <groupId>commons-codec</groupId> + </exclusion> + <exclusion> + <artifactId>joda-time</artifactId> + <groupId>joda-time</groupId> + </exclusion> + <exclusion> + <artifactId>jodd-core</artifactId> + <groupId>org.jodd</groupId> + </exclusion> + <exclusion> + <artifactId>datanucleus-core</artifactId> + <groupId>org.datanucleus</groupId> + </exclusion> + <exclusion> + <artifactId>libthrift</artifactId> + <groupId>org.apache.thrift</groupId> + </exclusion> + <exclusion> + <artifactId>libfb303</artifactId> + <groupId>org.apache.thrift</groupId> + </exclusion> + <exclusion> + <artifactId>avro-mapred</artifactId> + <groupId>org.apache.avro</groupId> + </exclusion> + <exclusion> + <artifactId>jsr305</artifactId> + <groupId>com.google.code.findbugs</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>hamcrest-core</artifactId> + <groupId>org.hamcrest</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_2.10</artifactId> + <version>2.2.4</version> + <scope>test</scope> + </dependency> + </dependencies> + <properties> + <avro.version>1.7.7</avro.version> + <scala.binary.version>2.10</scala.binary.version> + <jackson.version>2.8.7</jackson.version> + <maven.compiler.source>1.8</maven.compiler.source> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <spark.version>1.6.0</spark.version> + <maven.compiler.target>1.8</maven.compiler.target> + </properties> +</project> + http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/pom.xml ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/pom.xml b/griffin-streaming-model/pom.xml new file mode 100644 index 0000000..ac96542 --- /dev/null +++ b/griffin-streaming-model/pom.xml @@ -0,0 +1,187 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.griffin</groupId> + <artifactId>griffin-crawler-model</artifactId> + <version>1.0-SNAPSHOT</version> + + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + + <spark.version>1.6.0</spark.version> + <scala.binary.version>2.10</scala.binary.version> + <avro.version>1.7.7</avro.version> + + <jackson.version>2.8.7</jackson.version> + <!--<javax.validation.version>1.1.0.Final</javax.validation.version>--> + <!--<hibernate.validator.version>5.4.0.Final</hibernate.validator.version>--> + <!--<javax.el.version>2.2.4</javax.el.version>--> + + </properties> + + <dependencies> + <!--<dependency>--> + <!--<groupId>org.apache.kafka</groupId>--> + <!--<artifactId>kafka_${scala.binary.version}</artifactId>--> + <!--<version>0.8.2.1</version>--> + <!--<exclusions>--> + <!--<exclusion>--> + <!--<artifactId>jmxri</artifactId>--> + <!--<groupId>com.sun.jmx</groupId>--> + <!--</exclusion>--> + <!--<exclusion>--> + <!--<artifactId>jmxtools</artifactId>--> + <!--<groupId>com.sun.jdmk</groupId>--> + <!--</exclusion>--> + <!--<exclusion>--> + <!--<artifactId>jms</artifactId>--> + <!--<groupId>javax.jms</groupId>--> + <!--</exclusion>--> + <!--<exclusion>--> + <!--<artifactId>junit</artifactId>--> + <!--<groupId>junit</groupId>--> + <!--</exclusion>--> + <!--</exclusions>--> + <!--</dependency>--> + <!--<dependency>--> + <!--<groupId>junit</groupId>--> + <!--<artifactId>junit</artifactId>--> + <!--<version>3.8.1</version>--> + <!--<scope>test</scope>--> + <!--</dependency>--> + + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + + <!--<dependency>--> + <!--<groupId>org.apache.spark</groupId>--> + <!--<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>--> + <!--<version>${spark.version}</version>--> + <!--</dependency>--> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.module</groupId> + <artifactId>jackson-module-scala_${scala.binary.version}</artifactId> + <version>${jackson.version}</version> + </dependency> + + <dependency> + <groupId>org.scalaj</groupId> + <artifactId>scalaj-http_${scala.binary.version}</artifactId> + <version>2.3.0</version> + </dependency> + + <!--<dependency>--> + <!--<groupId>javax.validation</groupId>--> + <!--<artifactId>validation-api</artifactId>--> + <!--<version>${javax.validation.version}</version>--> + <!--</dependency>--> + <!--<dependency>--> + <!--<groupId>org.hibernate</groupId>--> + <!--<artifactId>hibernate-validator</artifactId>--> + <!--<version>${hibernate.validator.version}</version>--> + <!--</dependency>--> + <!--<dependency>--> + <!--<groupId>javax.el</groupId>--> + <!--<artifactId>javax.el-api</artifactId>--> + <!--<version>${javax.el.version}</version>--> + <!--</dependency>--> + <!--<dependency>--> + <!--<groupId>org.glassfish.web</groupId>--> + <!--<artifactId>javax.el</artifactId>--> + <!--<version>${javax.el.version}</version>--> + <!--</dependency>--> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <version>2.2.4</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.2.2</version> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.3</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>com.colobu.kafka.ConsumerExample</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/resources/config.json ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/resources/config.json b/griffin-streaming-model/src/main/resources/config.json new file mode 100644 index 0000000..eaa6452 --- /dev/null +++ b/griffin-streaming-model/src/main/resources/config.json @@ -0,0 +1,190 @@ +{ + "spark": { + "app.name": "GriffinAccuStreamingApp", + "log.level": "INFO", + "config": { + "spark.task.maxFailures": 5, + "spark.streaming.kafka.maxRatePerPartition": 1000, + "spark.streaming.concurrentJobs": 4 + }, + "streaming.checkpoint.dir": "hdfs:///test/kafka/cp", + "streaming.batch.interval.seconds": 5, + "streaming.sample.interval.seconds": 5 + }, + + "type.config": { + "dq": "accuracy", + "dataAsset": { + "source": "kafka", + "target": "kafka" + } + }, + + "dataAsset": { + "source": { + "kafka.config": { + "bootstrap.servers": "10.9.246.187:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "crwaler-in", + "pre.process": { + "sample": { + "need.sample": true, + "ratio": 1 + }, + "parse": { + "in.type": "json", + "out.type": "map", + "config": { + "out.schema": [ + { + "name": "url", + "type": "string", + "default.value": "", + "extract.steps": ["json", ".seeds", "[*]", "json", ".url"] + }, + { + "name": "createdts", + "type": "bigint", + "default.value": "0", + "extract.steps": ["json", ".seeds", "[*]", "json", ".metadata", "json", ".tracker", ".crawlRequestCreateTS"] + } + ] + } + } + }, + "dump.config": { + "table.name": "cin", + "dir": "hdfs:///griffin/streaming/dump", + "schema": [ + { + "name": "url", + "type": "string", + "default.value": "" + }, + { + "name": "createdts", + "type": "bigint", + "default.value": "0" + } + ] + }, + "schema": [ + { + "name": "url", + "type": "string", + "default.value": "" + }, + { + "name": "createdts", + "type": "bigint", + "default.value": "0" + } + ] + }, + "target": { + "kafka.config": { + "bootstrap.servers": "10.9.246.187:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "crawler-out", + "pre.process": { + "parse": { + "in.type": "json", + "out.type": "map", + "config": { + "out.schema": [ + { + "name": "url", + "type": "string", + "default.value": "", + "extract.steps": ["json", ".groups", "[0]", ".attrsList", "[.name=URL]", ".values", "[0]"] + }, + { + "name": "createdts", + "type": "bigint", + "default.value": "0", + "extract.steps": ["json", ".groups", "[0]", ".attrsList", "[.name=CRAWLMETADATA]", ".values", "[0]", "json", ".tracker", ".crawlRequestCreateTS"] + } + ] + } + } + }, + "dump.config": { + "table.name": "crawlerout", + "dir": "hdfs:///griffin/streaming/dump", + "schema": [ + { + "name": "url", + "type": "string", + "default.value": "" + }, + { + "name": "createdts", + "type": "bigint", + "default.value": "0" + } + ] + }, + "schema": [ + { + "name": "url", + "type": "string", + "default.value": "" + }, + { + "name": "createdts", + "type": "bigint", + "default.value": "0" + } + ] + } + }, + + "dq.config": { + "accuracy": { + "match.valid.time": { + "begin": 0, + "end": 24, + "unit": "hour" + }, + "mapping": [ + { + "source.name": "url", + "target.name": "url", + "isPK": true + }, { + "source.name": "createdts", + "target.name": "createdts", + "isPK": true + } + ] + } + }, + + "recorder": { + "types": ["hdfs", "post"], + "metric.name": "nrt_accuracy_crawler", + "config": { + "hdfs.dir": "hdfs:///griffin/streaming/record/", + "post.url": "http://phxbark4dq-360935.stratus.phx.ebay.com:8080/", + "post.metric.name": "nrt_accuracy_crawler" + } + }, + + "cleaner": { + + }, + + "retry": { + "need.retry": true, + "next.retry.seconds": 300, + "interval.seconds": 60 + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/resources/kafkaConsumerConfig.properties ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/resources/kafkaConsumerConfig.properties b/griffin-streaming-model/src/main/resources/kafkaConsumerConfig.properties new file mode 100644 index 0000000..b54b5e4 --- /dev/null +++ b/griffin-streaming-model/src/main/resources/kafkaConsumerConfig.properties @@ -0,0 +1,2 @@ +bootstrap.servers = 10.9.246.187:9092 +group.id = \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/Application.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/Application.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/Application.scala new file mode 100644 index 0000000..1ab4df8 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/Application.scala @@ -0,0 +1,71 @@ +package org.apache.griffin + +import org.apache.griffin.algo._ +import org.apache.griffin.config.ConfigFileReader +import org.apache.griffin.config.params._ +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.{Logging, SparkConf, SparkContext} + +import scala.collection.JavaConverters._ + +object Application extends Logging { +//object Application { + + def main(args: Array[String]): Unit = { + if (args.length < 1) { + logError("Usage: class [<config-files>]") + sys.exit(-1) + } + + val Array(configFile) = args + + // read config file + val configReader = ConfigFileReader(configFile) +// val allParam = configReader.readConfig[AllParam] + val allParam = configReader.readConfig + + // spark config + val sparkParam = allParam.sparkParam + + // type config + val typeConfigParam = allParam.typeConfigParam + val dqType = DqType.parse(typeConfigParam.dqType) + val dataAssetTypeMap = typeConfigParam.dataAssetTypeMap + + // data assets + val dataAssetParamMap = allParam.dataAssetParamMap + + // validation of params for algo + val validation = dqType match { + case AccuracyType => { + (dataAssetTypeMap.contains("source") && dataAssetParamMap.contains("source")) && + (dataAssetTypeMap.contains("target") && dataAssetParamMap.contains("target")) + } + case _ => (dataAssetTypeMap.contains("source") && dataAssetParamMap.contains("source")) + } + if (!validation) { + logError("Config Error!") + sys.exit(-1) + } + + // choose algo +// val algo: Algo = dqType match { +// case AccuracyType => { +// dataAssetTypeMap.get("source") match { +// case Some("kafka") => StreamingAccuracyAlgo(sparkParam, dataAssetParamMap.get("source").get, dataAssetParamMap.get("target").get) +// case _ => StreamingAccuracyAlgo(sparkParam, dataAssetParamMap.get("source").get, dataAssetParamMap.get("target").get) // fixme: it's wrong type here +// } +// } +// case _ => { +// // fixme: it's wrong type here +// StreamingAccuracyAlgo(sparkParam, dataAssetParamMap.get("source").get, dataAssetParamMap.get("target").get) +// } +// } + val algo: Algo = StreamingAccuracyAlgo4Crawler(allParam) + + algo.run + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/AccuracyAlgo.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/AccuracyAlgo.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/AccuracyAlgo.scala new file mode 100644 index 0000000..f09be1b --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/AccuracyAlgo.scala @@ -0,0 +1,11 @@ +package org.apache.griffin.algo + +import org.apache.griffin.config.params._ + + +trait AccuracyAlgo extends Algo { + + val sourceDataParam: DataAssetParam + val targetDataParam: DataAssetParam + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/Algo.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/Algo.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/Algo.scala new file mode 100644 index 0000000..d7f51b3 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/Algo.scala @@ -0,0 +1,11 @@ +package org.apache.griffin.algo + +import org.apache.griffin.config.params._ + +trait Algo extends Serializable { + + val sparkParam: SparkParam + + def run(): Unit + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/StreamingAccuracyAlgo.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/StreamingAccuracyAlgo.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/StreamingAccuracyAlgo.scala new file mode 100644 index 0000000..f0f2023 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/StreamingAccuracyAlgo.scala @@ -0,0 +1,27 @@ +package org.apache.griffin.algo + +import org.apache.griffin.config.params._ +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.{SparkConf, SparkContext} + +case class StreamingAccuracyAlgo(val sparkParam: SparkParam, + val sourceDataParam: DataAssetParam, + val targetDataParam: DataAssetParam + ) extends AccuracyAlgo { + + def run(): Unit = { + // create context +// val sparkConf = new SparkConf().setAppName(sparkParam.appName) +// sparkConf.setAll(sparkParam.config) +// +// val sc = new SparkContext(sparkConf) +// sc.setLogLevel(sparkParam.logLevel) +// val sqlContext = new HiveContext(sc) +// val ssc = new StreamingContext(sc, Seconds(sparkParam.interval)) +// ssc.checkpoint(sparkParam.cpDir) + + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/StreamingAccuracyAlgo4Crawler.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/StreamingAccuracyAlgo4Crawler.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/StreamingAccuracyAlgo4Crawler.scala new file mode 100644 index 0000000..caa7ad5 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/StreamingAccuracyAlgo4Crawler.scala @@ -0,0 +1,770 @@ +package org.apache.griffin.algo + +import java.util.Date + +import kafka.serializer.StringDecoder +import org.apache.griffin.algo.core.AccuracyCore +import org.apache.griffin.cache.{CacheProcesser, CachedAccuData} +import org.apache.griffin.config.params._ +import org.apache.griffin.dump._ +import org.apache.griffin.prep.parse.{DataParser, Json2MapParser} +import org.apache.griffin.record.{Recorder, RecorderFactory} +import org.apache.griffin.utils.HdfsUtil +import org.apache.spark.rdd.{RDD, EmptyRDD} +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.streaming.dstream.InputDStream +import org.apache.spark.streaming.kafka.KafkaUtils +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.griffin.record.result.{AccuResult, Result} + + +case class StreamingAccuracyAlgo4Crawler( val allParam: AllParam + ) extends AccuracyAlgo { + + val sparkParam: SparkParam = allParam.sparkParam + val sourceDataParam: DataAssetParam = allParam.dataAssetParamMap.get("source").get + val targetDataParam: DataAssetParam = allParam.dataAssetParamMap.get("target").get + + // current dumping hour and minute + private var dumpMin: Long = 0L + + // current checked minute + private var checkMin: Long = 0L + + // next fire time + private var nextFireTime: Long = 0L + + + + def run(): Unit = { + val ssc = StreamingContext.getOrCreate(sparkParam.cpDir, + ( ) => { + try { + createContext() + } catch { + case runtime: RuntimeException => { + throw runtime + } + } + }) + + ssc.start() + ssc.awaitTermination() + ssc.stop(stopSparkContext=true, stopGracefully=true) + } + + def createContext(): StreamingContext = { + // create context + val sparkConf = new SparkConf().setAppName(sparkParam.appName) + sparkConf.setAll(sparkParam.config) + + val sc = new SparkContext(sparkConf) + sc.setLogLevel(sparkParam.logLevel) + val sqlContext = new HiveContext(sc) + val interval = Seconds(sparkParam.batchInterval) + val ssc = new StreamingContext(sc, interval) + ssc.checkpoint(sparkParam.cpDir) + + // emptyRDD + val emptyRdd = sc.emptyRDD[(Product, (Map[String, Any], Map[String, Any]))] + + // source data direct stream + val sourceDStream = createDirectStream(ssc, sourceDataParam) + + // target data direct stream + val targetDStream = createDirectStream(ssc, targetDataParam) + + // accuracy param + val accuracyParam = allParam.dqConfigParam.accuracy + val matchValidTime = accuracyParam.matchValidTime + val timeRange = (getTimeMs(matchValidTime.begin, matchValidTime.unit), getTimeMs(matchValidTime.end, matchValidTime.unit)) + + val mapping = accuracyParam.mapping + // get mapping Keys and Values + val mappingKey = mapping.filter(_.isPK) + val mappingValue = mapping + + // retry param + val retryParam = allParam.retryParam + val nextRetryInterval = retryParam.nextRetryInterval * 1000 + val retryPollInterval = retryParam.interval * 1000 + val retryInterval = Seconds(retryParam.interval) + val cacheProcesser = CacheProcesser(retryParam) + + // recorder + val recorderFactory = RecorderFactory(allParam.recorderParam) + + //--- 1. target part --- + + // target data from kafka (crawler output) + val targetDumpConfigParam = targetDataParam.dumpConfigParam + val targetPrepParam = targetDataParam.prepParam + val targetSchema = targetDataParam.schema + + // target parser + val targetParser = Json2MapParser(targetPrepParam.parseParam.configParam) + + // create hive table for target data + val targetDumpWrapper = DumpWrapper(List[DumpWrapperInfo]()) + val targetPartitionDef = List[(String, String)](("hr", "bigint"), ("min", "bigint")) + val (targetCreateTableSql, targetDumpDir) = genCreateTableSql(targetDumpConfigParam, targetDumpWrapper, targetPartitionDef) + val targetTableName = targetDumpConfigParam.tableName +// val fieldSep = targetDumpConfigParam.fieldSep +// val lineSep = targetDumpConfigParam.lineSep + val targetFieldSep = "," + val targetLineSep = "\n" + sqlContext.sql(targetCreateTableSql) + + // target dstream dump process + targetDStream.foreachRDD((rdd, time) => { + val ms = time.milliseconds + val min = getTimeToUnit(ms, "min") + val hour = getTimeToUnit(ms, "hour") + val partitions = List[(String, Any)](("hr", hour), ("min", min)) + + val partitionPath = genPartitionHdfsPath(partitions) + val path = s"${targetDumpDir}/${partitionPath}/${ms}" + + // parse each message + val recordRdd: RDD[String] = rdd.flatMap { kv => + val msg = kv._2 + + // parse rdd msg + val res: Seq[Map[String, Any]] = targetParser.parse(msg) + + // the sequence is the same with schema sequence, no need to sort here + val records = res.map { r => + r.values.mkString(targetFieldSep) + } + + records + } + + if (!recordRdd.isEmpty) { + + // save to hdfs + HdfsFileDumpUtil.dump(path, recordRdd, targetLineSep) + + // update hive + val partitionSql = genAddPartitionSql(targetTableName, partitions) + sqlContext.sql(partitionSql) + } + + // update dumpHourMin + updateDumpMin(min) + }) + + //--- 2. source part --- + + // source data from kafka (crawler input) + val sourceDumpConfigParam = sourceDataParam.dumpConfigParam + val sourcePrepParam = sourceDataParam.prepParam + val sourceSchema = sourceDataParam.schema + val sourceSampleParam = sourcePrepParam.sampleParam + + // source parser + val sourceParser = Json2MapParser(sourcePrepParam.parseParam.configParam) + + // create hive table for source data + val sourceDumpWrapper = DumpWrapper(List[DumpWrapperInfo](TimeGroupInfo, NextFireTimeInfo, MismatchInfo, ErrorInfo)) + val sourcePartitionDef = List[(String, String)](("firetime", "bigint")) + val (sourceCreateTableSql, sourceDumpDir) = genCreateTableSql(sourceDumpConfigParam, sourceDumpWrapper, sourcePartitionDef) + val sourceTableName = sourceDumpConfigParam.tableName + val sourceFieldSep = "," + val sourceLineSep = "\n" + sqlContext.sql(sourceCreateTableSql) + + // get source dump keys + val sourceDumpValueKeys = sourceDumpConfigParam.schema.map(_.name) + val sourceDumpInfoKeys = sourceDumpWrapper.wrapKeys + + // source dstream parse and compare process + val sampleInterval = Seconds(sparkParam.sampleInterval) + sourceDStream.foreachRDD((rdd, time) => { + val ms = time.milliseconds + + val batchTime = new Date() + println(s"++++++ ${batchTime} ++++++") + + val curMin = getTimeToUnit(ms, "min") + + // get next fire time + val cft = getNextFireTime + val (nft, fireNow, updateHive) = if (cft <= 0) { + initNextFireTime(ms, nextRetryInterval, retryPollInterval) + (getNextFireTime, false, true) + } else if (cft <= ms) { + updateNextFireTime(nextRetryInterval) + (getNextFireTime, true, true) + } else (cft, false, false) + + // get check min + val icm = getCheckMin + val curCheckMin = if (icm <= 0) { + updateCheckMin(curMin) + getCheckMin + } else icm + + // dump path + val partitions = List[(String, Any)](("firetime", nft)) + val partitionPath = genPartitionHdfsPath(partitions) + val dumpPath = s"${sourceDumpDir}/${partitionPath}/${ms}" + val dumpRetryPath = s"${sourceDumpDir}/${partitionPath}/${ms}.r" + + // update hive + if (updateHive) { + val partitionSql = genAddPartitionSql(sourceTableName, partitions) + sqlContext.sql(partitionSql) + } + + // sample time process + if (time.isMultipleOf(sampleInterval)) { + val recorders = recorderFactory.getRecorders(ms) + + // start + recorders.foreach(_.start) + + // sample data + val sampleRdd = if (sourceSampleParam.needSample) { + sampleFunc(rdd, sourceSampleParam.ratio) + } else rdd + + val sampleCount = sampleRdd.count + println(s"===== sampleCount: ${sampleCount} =====") + + // parse data + if (sampleRdd.isEmpty) { + // empty source data, do nothing here + recorders.foreach(_.info(ms, "empty source data")) + } else { + // parse data + val parseRdd: RDD[Map[String, Any]] = sampleRdd.flatMap { kv => + val msg = kv._2 + // parse msg + sourceParser.parse(msg) + } + + val parseCount = parseRdd.count + println(s"===== parseCount: ${parseCount} =====") + + // get source data + val sourceRdd: RDD[(Product, Map[String, Any])] = parseRdd.map { row => + val keys: List[AnyRef] = mappingKey.map { mp => + row.get(mp.sourceName) match { + case Some(v) => v.asInstanceOf[AnyRef] + case None => null + } + } + val values: Map[String, Any] = mappingValue.map { mp => + val v = row.get(mp.sourceName) match { + case Some(v) => v + case None => null + } + (mp.sourceName, v) + }.toMap + (toTuple(keys) -> values) + } + val sourceCount = sourceRdd.count + + println(s"===== first time sourceCount: ${sourceCount} =====") + + // wrap source data + val sourceWrappedRdd: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceRdd.map { r => + val wrappedData = Map[String, Any]( + (TimeGroupInfo.key -> ms), + (NextFireTimeInfo.key -> nft), + (MismatchInfo.key -> ""), + (ErrorInfo.key -> "") + ) + (r._1, (r._2, wrappedData)) + } + + val (result, missRdd, matchRdd) = accu(curMin, sourceWrappedRdd) + + val missingRddCount1 = missRdd.count + println(s"===== missingRddCount1: ${missingRddCount1} =====") + + // record result + recorders.foreach(_.accuracyResult(ms, result)) + + val missStrings = missRdd.map { row => + val (key, (value, info)) = row + s"${value} [${info.getOrElse(MismatchInfo.key, "unknown")}]" + }.collect() + + // record missing records + recorders.foreach(_.accuracyMissingRecords(missStrings)) + + // cache + val cacheData = CachedAccuData() + cacheData.result = result + cacheData.curTime = ms + cacheData.deadTime = timeRange._2 + ms + cacheProcesser.cache(ms, cacheData) + + // dump mismatch data to hive + if (!missRdd.isEmpty) { + // parse each record + val dumpMissingRdd: RDD[String] = missRdd.map { kv => + val (_, (dt, inf)) = kv + val values = getDumpValues(dt, sourceDumpValueKeys) + val infos = getDumpValues(inf, sourceDumpInfoKeys) + (values ::: infos).mkString(sourceFieldSep) + } + + // save to hdfs + HdfsFileDumpUtil.dump(dumpPath, dumpMissingRdd, sourceLineSep) + + } + + } + + // finish + recorders.foreach(_.finish) + + } + + // retry + if (fireNow) { + + val t1 = new Date().getTime + println("===== retrying begin =====") + + // get current path + val partitions = List[(String, Any)](("firetime", cft)) + val partitionPath = genPartitionHdfsPath(partitions) + val dumpedDirPath = s"${sourceDumpDir}/${partitionPath}" + + println(s"===== nft: ${nft} =====") + println(s"===== cft: ${cft} =====") + println(s"===== dumpedDirPath: ${dumpedDirPath} =====") + + val tt1 = new Date().getTime + + // get missing data from hive + val partitionsRange = List(("firetime", cft)) + val selectSql = genSelectSql(sourceTableName, partitionsRange) + val missingDataFrame: DataFrame = sqlContext.sql(selectSql) + + val tt2 = new Date().getTime + println(s"--------------- get source data from hive time: ${tt2 - tt1} ---------------") + + // get missing data + val missingSourceWrappedRdd: RDD[(Product, (Map[String, Any], Map[String, Any]))] = missingDataFrame.map { row => + val keys: List[AnyRef] = mappingKey.map { mp => + row.getAs[Any](mp.sourceName).asInstanceOf[AnyRef] + } + val values: Map[String, Any] = sourceDumpValueKeys.map { key => + val v = row.getAs[Any](key) + (key, v) + }.toMap + val infos: Map[String, Any] = sourceDumpInfoKeys.map { key => + val v = row.getAs[Any](key) + (key, v) + }.toMap + (toTuple(keys) -> (values, infos)) + } + + val tt3 = new Date().getTime + println(s"--------------- wrap source data time: ${tt3 - tt2} ---------------") + + if (!missingSourceWrappedRdd.isEmpty) { + val sourceCount = missingSourceWrappedRdd.count + println(s"===== rerty sourceCount: ${sourceCount} =====") + + val (retryResult, stillMissingRdd, matchRdd) = accu(curCheckMin, missingSourceWrappedRdd) + + val tt4 = new Date().getTime + println(s"--------------- accu time: ${tt4 - tt3} ---------------") + + val missingRddCount2 = stillMissingRdd.count + println(s"===== missingRddCount2: ${missingRddCount2} =====") + + val matchRddCount2 = matchRdd.count + println(s"===== matchRddCount2: ${matchRddCount2} =====") + + // filter still need saving rdd + val savingRdd: RDD[(Product, (Map[String, Any], Map[String, Any]))] = stillMissingRdd.filter { row => + val (key, (value, info)) = row + info.get(TimeGroupInfo.key) match { + case Some(t: Long) => { + cacheProcesser.getCache(t) match { + case Some(cache) => true + case _ => false + } + } + case _ => false + } + } + + def reorgByTimeGroup(rdd: RDD[(Product, (Map[String, Any], Map[String, Any]))] + ): RDD[(Long, (Product, (Map[String, Any], Map[String, Any])))] = { + rdd.flatMap { row => + val (key, (value, info)) = row + val b: Option[(Long, (Product, (Map[String, Any], Map[String, Any])))] = info.get(TimeGroupInfo.key) match { + case Some(t: Long) => Some((t, row)) + case _ => None + } + b + } + } + + // get missing results + val missingResults = reorgByTimeGroup(stillMissingRdd) + val missingResultsCount = missingResults.count + println(s"===== missingResultsCount: ${missingResultsCount} =====") + + // get matched results + val matchResults = reorgByTimeGroup(matchRdd) + val matchResultsCount = matchResults.count + println(s"===== matchResultsCount: ${matchResultsCount} =====") + + val groupedResults = missingResults.cogroup(matchResults) + + val tt5 = new Date().getTime + println(s"--------------- cogroup time: ${tt5 - tt4} ---------------") + +// val groupedMatchResults: RDD[(Long, Iterable[(Product, (Map[String, Any], Map[String, Any]))])] = matchResultsCount.groupByKey() + + val groupedResultsCount = groupedResults.count + println(s"===== groupedResultsCount: ${groupedResultsCount} =====") + + groupedResults.foreach(row => println(s"=== ${row._1}: ${row._2._1.size}, ${row._2._2.size} === ${ms} ===")) + + val retry_time = new Date().getTime + + // record + groupedResults.foreach { rslt => + val (t, (missRes, matchRes)) = rslt + val curRecorders = recorderFactory.getRecorders(t) + + cacheProcesser.getCache(t) match { + case Some(cache) => { + val matchCount = matchRes.size + // need to update result + if (matchCount > 0) { + val accuCache = cache.asInstanceOf[CachedAccuData] + val delta = AccuResult(missRes.size, (missRes.size + matchRes.size)) + val oldResult = accuCache.result + val newResult = accuCache.result.updateResult(delta) + + // different result + if (newResult.differsFrom(oldResult)) { + accuCache.result = newResult + + // record result + curRecorders.foreach(_.accuracyResult(ms, newResult)) + + val missStrings = missRes.map { row => + val (key, (value, info)) = row + s"${value} [${info.getOrElse(MismatchInfo.key, "unknown")}]" + } + // record missing records + curRecorders.foreach(_.accuracyMissingRecords(missStrings)) + + curRecorders.foreach(_.recordTime(ms, retry_time - t1)) + println(s"----- record here: [${ms}, (${t}: ${newResult})] -----") + } + } + } + case _ => { + println(s"=== no cache of ${t}, need to clear ===") + } + } + } + + val savingRddCount = savingRdd.count + println(s"===== savingRddCount: ${savingRddCount} =====") + + val tt6 = new Date().getTime + println(s"--------------- record and get saving data time: ${tt6 - tt5} ---------------") + + // caches refresh + cacheProcesser.refresh(ms) + + // dump mismatch data to hive + if (!savingRdd.isEmpty) { + // parse each record + val dumpMissingRdd: RDD[String] = savingRdd.map { kv => + val (_, (dt, inf)) = kv + val values = getDumpValues(dt, sourceDumpValueKeys) + val infos = getDumpValues(inf, sourceDumpInfoKeys) + (values ::: infos).mkString(sourceFieldSep) + } + + // save to hdfs + HdfsFileDumpUtil.dump(dumpRetryPath, dumpMissingRdd, sourceLineSep) + + val tt7 = new Date().getTime + println(s"--------------- dump saving data time: ${tt7 - tt6} ---------------") + } + } else { + println("===== retry source data empty =====") + } + + val tt7 = new Date().getTime + + // remove current calculated dumped datas + HdfsUtil.deleteHdfsPath(dumpedDirPath) + + val tt8 = new Date().getTime + println(s"--------------- remove outtime data time: ${tt8 - tt7} ---------------") + + val t2 = new Date().getTime + println(s"===== retrying end [using ${t2 - t1} ms] =====") + } + + + }) + + def accu(curMin: Long, sourceWrappedRdd: RDD[(Product, (Map[String, Any], Map[String, Any]))] + ): (AccuResult, RDD[(Product, (Map[String, Any], Map[String, Any]))], RDD[(Product, (Map[String, Any], Map[String, Any]))]) = { + val t1 = new Date().getTime + + // get dump min + val dumpedMin = getDumpedMin + + val mRange = (curMin, dumpedMin) + println(s"===== mRange: ${mRange} =====") + + val ret = if (dumpReady(curMin, dumpedMin)) { + // get target data from hive + val minRange = (curMin, dumpedMin) + val partitionsRange = List(("min", minRange)) + val selectSql = genSelectSql(targetTableName, partitionsRange) + val targetDataFrame: DataFrame = sqlContext.sql(selectSql) + + // get target data + val targetWrappedRdd: RDD[(Product, (Map[String, Any], Map[String, Any]))] = targetDataFrame.map { row => + val keys: List[AnyRef] = mappingKey.map { mp => + row.getAs[Any](mp.targetName).asInstanceOf[AnyRef] + } + val values: Map[String, Any] = mappingValue.map { mp => + val v = row.getAs[Any](mp.targetName) + (mp.targetName, v) + }.toMap + (toTuple(keys) -> (values, Map[String, Any]())) + } + + val targetCount = targetWrappedRdd.count + println(s"===== targetCount: ${targetCount} =====") + + val (result, missRdd, matchRdd) = if (targetWrappedRdd.isEmpty) { + val sourceCount = sourceWrappedRdd.count + (AccuResult(sourceCount, sourceCount), sourceWrappedRdd, emptyRdd) + } else { + // cogroup source and target + val allKvs = sourceWrappedRdd.cogroup(targetWrappedRdd) + + // accuracy algorithm calculation + val (accuResult, missingRdd, matchingRdd) = AccuracyCore.accuracy(allKvs) + + println("source count: " + accuResult.totalCount + " missed count : " + accuResult.missCount) + + val missingCount = missingRdd.count + println(s"===== missingCount: ${missingCount} =====") + + (accuResult, missingRdd, matchingRdd) + } + + // update check min + updateCheckMin(dumpedMin + 1) + + (result, missRdd, matchRdd) + + } else { + println("===== dump data not ready =====") + val sourceCount = sourceWrappedRdd.count + (AccuResult(sourceCount, sourceCount), sourceWrappedRdd, emptyRdd) + } + + val t2 = new Date().getTime + + println(s"===== accu using time ${t2 - t1} ms =====") + + ret + } + + ssc + + } + + def createDirectStream(ssc: StreamingContext, dataAssetParam: DataAssetParam): InputDStream[(String, String)] = { + val kafkaParam = dataAssetParam.kafkaConfig + val topics = dataAssetParam.topics.split(",").toSet + KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, + kafkaParam, + topics + ) + } + + def genCreateTableSql(dumpConfigParam: DumpParam, dumpWrapper: DumpWrapper, partitionDef: List[(String, String)]): (String, String) = { + val tableName = dumpConfigParam.tableName + val targetDumpDir = s"${dumpConfigParam.dir}/${tableName}" + + val schema = dumpConfigParam.schema + val schemaCols = schema.map { field => + (field.name, field.fieldType) + } + + val wrapperCols = dumpWrapper.wrapSchema + + val colsSql = (schemaCols ++ wrapperCols).map(col => (s"`${col._1}` ${col._2}")).mkString(", ") + +// val partitions = dumpConfigParam.partitions +// val partitionCols = partitions.map { field => +// (field.name, field.fieldType) +// }.toMap +// val partitionsSql = partitionCols.map(prtn => (s"`${prtn._1}` ${prtn._2}")).mkString(", ") + val partitionsSql = partitionDef.map(prtn => (s"`${prtn._1}` ${prtn._2}")).mkString(", ") + + val sql = + s"""CREATE EXTERNAL TABLE IF NOT EXISTS `${tableName}` + |(${colsSql}) PARTITIONED BY (${partitionsSql}) + |ROW FORMAT DELIMITED + |FIELDS TERMINATED BY ',' + |LINES TERMINATED BY '\n' + |STORED AS TEXTFILE + |LOCATION '${targetDumpDir}'""".stripMargin + (sql, targetDumpDir) + } + + + def genPartitionHdfsPath(partitions: List[(String, Any)]): String = { + partitions.map(prtn => s"${prtn._1}=${prtn._2}").mkString("/") + } + + def genAddPartitionSql(tableName: String, partitions: List[(String, Any)]): String = { + val partitionSql = partitions.map(prtn => (s"`${prtn._1}`=${prtn._2}")).mkString(", ") + val sql = s"""ALTER TABLE `${tableName}` ADD IF NOT EXISTS PARTITION (${partitionSql})""" + sql + } + + def genDropPartitionSql(tableName: String, partitions: List[(String, Any)]): String = { + val partitionSql = partitions.map(prtn => (s"`${prtn._1}`=${prtn._2}")).mkString(", ") + val sql = s"""ALTER TABLE `${tableName}` DROP IF EXISTS PARTITION (${partitionSql})""" + sql + } + + def genSelectSql(tableName: String, partitionsRange: List[(String, Any)]): String = { + val whereClause = partitionsRange match { + case Nil => "" + case _ => { + val clause = partitionsRange.map { prtn => + prtn match { + case (name, range: (Any, Any)) => s"""`${name}` BETWEEN '${range._1}' and '${range._2}'""" + case (name, value) => s"""`${name}`='${value}'""" + case _ => "" + } + }.filter(_.nonEmpty).mkString(" AND ") + s"WHERE ${clause}" + } + } + val sql = s"""SELECT * FROM `${tableName}` ${whereClause}""" + sql + } + + def sampleFunc[T](src: RDD[T], ratio: Double, count: Int = 0): RDD[T] = { + if (src.isEmpty) { + println("===== empty source =====") + src + } else { + val sample = src.sample(false, ratio, System.nanoTime) + if (sample.isEmpty) { + if (count < 5) { + sampleFunc(src, ratio, count + 1) + } else { + println("===== empty sample, get source =====") + src + } + } else sample + } + } + + + + def getTimeMs(t: Int, unit: String): Long = { + val lt = t.toLong + unit match { + case "ms" => lt + case "sec" => lt * 1000 + case "min" => lt * 60 * 1000 + case "hour" => lt * 60 * 60 * 1000 + case "day" => lt * 24 * 60 * 60 * 1000 + case _ => lt * 60 * 1000 + } + } + + def getTimeToUnit(lt: Long, unit: String): Long = { + unit match { + case "ms" => lt + case "sec" => lt / 1000 + case "min" => lt / (60 * 1000) + case "hour" => lt / (60 * 60 * 1000) + case "day" => lt / (24 * 60 * 60 * 1000) + case _ => lt / (60 * 1000) + } + } + + def updateDumpMin(min: Long): Unit = { + this.dumpMin = min + } + + def getDumpedMin(): Long = { + this.dumpMin - 1 + } + + def dumpReady(beginMin: Long, dumpedMin: Long): Boolean = { + beginMin <= dumpedMin + } + + + def toTuple[A <: AnyRef](as: Seq[A]): Product = { + val tupleClass = Class.forName("scala.Tuple" + as.size) + tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product] + } + + def initNextFireTime(t: Long, nextInterval: Long, precInterval: Long): Unit = { + val nt = nextInterval + t + this.nextFireTime = nt / precInterval * precInterval + } + + def updateNextFireTime(nextInterval: Long): Unit = { + this.nextFireTime += nextInterval + } + + def getNextFireTime(): Long = { + this.nextFireTime + } + + def getDumpValues(data: Map[String, Any], keys: List[String]): List[String] = { + keys.map { k => + data.get(k) match { + case Some(v) => if (v != null) v.toString else "" + case _ => "" + } + } + } + + def getStringFromSchema(datas: (Map[String, Any], Map[String, Any]), dumpConfigParam: DumpParam, dumpWrapper: DumpWrapper): List[Option[_]] = { + val (dt, info) = datas + val schema = dumpConfigParam.schema + val schemaValues = schema.map { s => + dt.get(s.name) + } + val infoValues = dumpWrapper.wrapValues(info) + schemaValues ::: infoValues + } + + def getCheckMin(): Long = { + this.checkMin + } + + def updateCheckMin(min: Long): Unit = { + this.checkMin = min + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/core/AccuracyCore.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/core/AccuracyCore.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/core/AccuracyCore.scala new file mode 100644 index 0000000..b7a7c8e --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/algo/core/AccuracyCore.scala @@ -0,0 +1,92 @@ +package org.apache.griffin.algo.core + +import org.apache.griffin.dump._ +import org.apache.griffin.record.result.AccuResult +import org.apache.spark.rdd.RDD + + +object AccuracyCore { + + type T = Map[String, Any] + + def accuracy(allkvs: RDD[(Product, (Iterable[(T, T)], Iterable[(T, T)]))] + ): (AccuResult, RDD[(Product, (T, T))], RDD[(Product, (T, T))]) = { + val result: RDD[(Long, Long, List[(Product, (T, T))], List[(Product, (T, T))])] = allkvs.map { kv => + val (k, (srcData, tgtData)) = kv +// val srcCount = srcData.size + val tgtCount = tgtData.size + val tgt: Set[T] = tgtData.map(_._1).toSet // only get value + + // result: (missCount, matchCount, missDataList, matchDataList) + val rslt = srcData.foldLeft((0L, 0L, List[(Product, (T, T))](), List[(Product, (T, T))]())) { (r, mapPair) => + val (valueMap, infoMap) = mapPair + if (tgt.contains(valueMap)) { + val newInfoMap = infoMap + (MismatchInfo.key -> "matched") + val matchItem = (k, (valueMap, newInfoMap)) + (r._1, r._2 + 1, r._3, r._4 :+ matchItem) + } else { + val newInfoMap = if (tgtCount == 0) { + infoMap + (MismatchInfo.key -> "no target") + } else if (tgtCount == 1) { + infoMap + (MismatchInfo.key -> ("target data: " + tgt.head.toString)) + } else { + infoMap + (MismatchInfo.key -> "not match") + } + val missItem = (k, (valueMap, newInfoMap)) + (r._1 + 1, r._2, r._3 :+ missItem, r._4) + } + } + rslt + } + + val missRdd = result.flatMap(_._3) + val matchRdd = result.flatMap(_._4) + + def seq(cnt: (Long, Long), rcd: (Long, Long, Any, Any)): (Long, Long) = { + (cnt._1 + rcd._1, cnt._2 + rcd._2) + } + def comb(c1: (Long, Long), c2: (Long, Long)): (Long, Long) = { + (c1._1 + c2._1, c1._2 + c2._2) + } + val countPair = result.aggregate((0L, 0L))(seq, comb) + + (AccuResult(countPair._1, (countPair._1 + countPair._2)), missRdd, matchRdd) + } + +// def accuracy(allkvs: RDD[(Product, (Iterable[Map[String, Any]], Iterable[Map[String, Any]]))]): (AccuResult, List[String], RDD[(Product, Map[String, Any])]) = { +// val result: RDD[((Long, Long), List[String], List[(Product, Map[String, Any])])] = allkvs.map { kv => +// val (k, (srcData, tgtData)) = kv +// val srcCount = srcData.size +// val tgtCount = tgtData.size +// val tgt = tgtData.toSet +// val rslt = srcData.foldLeft((0L, List[String](), List[(Product, Map[String, Any])]())) { (r, mp) => +// if (tgt.contains(mp)) { +// r +// } else { +// val mp1 = if (tgtCount == 0) { +// mp + ("_error_" -> "no target") +// } else if (tgtCount == 1) { +// mp + ("_error_" -> tgt.head.toString) +// } else { +// mp + ("_error_" -> "not match") +// } +// (r._1 + 1, mp1.toString :: r._2, (k, mp) :: r._3) +// } +// } +// ((rslt._1, srcCount.toLong), rslt._2, rslt._3) +// } +// +// val missingRdd = result.flatMap(_._3) +// +// def seq(cnt: (Long, Long, List[String]), rcd: ((Long, Long), List[String], Any)): (Long, Long, List[String]) = { +// (cnt._1 + rcd._1._1, cnt._2 + rcd._1._2, cnt._3 ::: rcd._2) +// } +// def comb(c1: (Long, Long, List[String]), c2: (Long, Long, List[String])): (Long, Long, List[String]) = { +// (c1._1 + c2._1, c1._2 + c2._2, c1._3 ::: c2._3) +// } +// val missingCount = result.aggregate((0L, 0L, List[String]()))(seq, comb) +// +// (AccuResult(missingCount._1, missingCount._2), missingCount._3, missingRdd) +// } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CacheProcesser.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CacheProcesser.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CacheProcesser.scala new file mode 100644 index 0000000..c70ce77 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CacheProcesser.scala @@ -0,0 +1,54 @@ +package org.apache.griffin.cache + +import org.apache.griffin.config.params.RetryParam + +import scala.collection.mutable.{Map => MutableMap} + +case class CacheProcesser(retryParam: RetryParam) { + + val cacheGroup: MutableMap[Long, CachedData] = MutableMap() + + val needRetry = retryParam.needRetry + val nextRetryInterval = retryParam.nextRetryInterval * 1000 + + def cache(timeStamp: Long, cacheData: CachedData): Unit = { + if (!cacheData.cacheEnd) { + cacheGroup.get(timeStamp) match { + case Some(cd) => { + println("unnecessary to cache") + } + case _ => { + cacheGroup += (timeStamp -> cacheData) + println(s"add cache ${timeStamp}: ${cacheData.result}") + } + } + } else { + println("unnecessary to cache") + } + } + +// def process(time: Long): Unit = { +// val curCacheGroup = cacheGroup.toMap +// val fireCaches = curCacheGroup.filter { pair => (time >= pair._2.nextFireTime) } +// println(s"=== retry group count: ${fireCaches.size} ===") +// fireCaches.foreach { pair => +// val (_, cacheData) = pair +// println(s"=== retry: ${pair._1} ===") +// cacheData.recalcAndCache(time) +// cacheData.nextFireTime = time + nextRetryInterval +// } +// } + + def refresh(t: Long): Unit = { + val curCacheGroup = cacheGroup.toMap + curCacheGroup.foreach(_._2.curTime = t) + val deadCache = curCacheGroup.filter(_._2.cacheEnd) + println(s"=== dead cache group count: ${deadCache.size} ===") + deadCache.keySet.foreach(cacheGroup -= _) + } + + def getCache(timeStamp: Long): Option[CachedData] = { + cacheGroup.get(timeStamp) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CachedAccuData.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CachedAccuData.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CachedAccuData.scala new file mode 100644 index 0000000..0d939e5 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CachedAccuData.scala @@ -0,0 +1,23 @@ +package org.apache.griffin.cache + +import org.apache.spark.rdd.RDD +import org.apache.griffin.record.result.AccuResult + +case class CachedAccuData() extends CachedData { + + type T = AccuResult + +// var validMinRange: (Long, Long) = _ + +// var proc: (Long) => (T, RDD[(Product, Map[String, Any])], (Long, Long)) => (T, RDD[(Product, Map[String, Any])], (Long, Long)) = _ + +// def recalcAndCache(t: Long): Unit = { +// val res = proc(t)(result, rdd, validMinRange) +// unpersist +// result = res._1 +// rdd = res._2 +// validMinRange = res._3 +// persist +// } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CachedData.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CachedData.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CachedData.scala new file mode 100644 index 0000000..64474f8 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/cache/CachedData.scala @@ -0,0 +1,27 @@ +package org.apache.griffin.cache + +import org.apache.spark.rdd.RDD +import org.apache.griffin.record.result.Result + + +trait CachedData { + + type T <: Result + + var result: T = _ + + var curTime: Long = _ + var deadTime: Long = _ +// var rdd: RDD[(Product, Map[String, Any])] = _ + +// var nextFireTime: Long = _ + +// def persist(): Unit = { rdd.cache } +// def unpersist(): Unit = { rdd.unpersist() } + +// def recalcAndCache(t: Long): Unit + def cacheEnd(): Boolean = { + (curTime > deadTime) || (result != null && !result.needCalc) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/ConfigFileReader.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/ConfigFileReader.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/ConfigFileReader.scala new file mode 100644 index 0000000..f7e6fdf --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/ConfigFileReader.scala @@ -0,0 +1,20 @@ +package org.apache.griffin.config + +import org.apache.griffin.config.params._ +import org.apache.griffin.utils.JsonUtil + +case class ConfigFileReader(file: String) extends ConfigReader { + +// def readConfig[T <: ConfigParam](implicit m : Manifest[T]): T = { +// val lines = scala.io.Source.fromFile(file).mkString +// val param = JsonUtil.fromJson[T](lines) +// param +// } + + def readConfig: AllParam = { + val lines = scala.io.Source.fromFile(file).mkString + val param = JsonUtil.fromJson2AllParam(lines) + param + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/ConfigReader.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/ConfigReader.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/ConfigReader.scala new file mode 100644 index 0000000..5d87976 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/ConfigReader.scala @@ -0,0 +1,11 @@ +package org.apache.griffin.config + +import org.apache.griffin.config.params.ConfigParam + +trait ConfigReader extends Serializable { + +// def readConfig[T <: ConfigParam](implicit m : Manifest[T]): T + + def readConfig: ConfigParam + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/AccuracyParam.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/AccuracyParam.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/AccuracyParam.scala new file mode 100644 index 0000000..f4a263d --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/AccuracyParam.scala @@ -0,0 +1,11 @@ +package org.apache.griffin.config.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +@JsonInclude(Include.NON_NULL) +case class AccuracyParam( @JsonProperty("mapping") mapping: List[MappingParam], + @JsonProperty("match.valid.time") matchValidTime: TimeRangeParam + ) extends ConfigParam { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/AllParam.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/AllParam.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/AllParam.scala new file mode 100644 index 0000000..7c7be6d --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/AllParam.scala @@ -0,0 +1,15 @@ +package org.apache.griffin.config.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +@JsonInclude(Include.NON_NULL) +case class AllParam( @JsonProperty("spark") sparkParam: SparkParam, + @JsonProperty("type.config") typeConfigParam: TypeConfigParam, + @JsonProperty("dataAsset") dataAssetParamMap: Map[String, DataAssetParam], + @JsonProperty("dq.config") dqConfigParam: DqConfigParam, + @JsonProperty("recorder") recorderParam: RecorderParam, + @JsonProperty("retry") retryParam: RetryParam + ) extends ConfigParam { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ConfigParam.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ConfigParam.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ConfigParam.scala new file mode 100644 index 0000000..f97d071 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ConfigParam.scala @@ -0,0 +1,10 @@ +package org.apache.griffin.config.params + + +trait ConfigParam extends Serializable { + + def merge(cp: ConfigParam): Unit = {} + + def validate(): Boolean = true + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DataAssetParam.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DataAssetParam.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DataAssetParam.scala new file mode 100644 index 0000000..3647d8d --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DataAssetParam.scala @@ -0,0 +1,14 @@ +package org.apache.griffin.config.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +@JsonInclude(Include.NON_NULL) +case class DataAssetParam( @JsonProperty("kafka.config") kafkaConfig: Map[String, String], + @JsonProperty("topics") topics: String, + @JsonProperty("pre.process") prepParam: PrepParam, + @JsonProperty("dump.config") dumpConfigParam: DumpParam, + @JsonProperty("schema") schema: List[SchemaFieldParam] + ) extends ConfigParam { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DqConfigParam.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DqConfigParam.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DqConfigParam.scala new file mode 100644 index 0000000..789bbb8 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DqConfigParam.scala @@ -0,0 +1,10 @@ +package org.apache.griffin.config.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +@JsonInclude(Include.NON_NULL) +case class DqConfigParam( @JsonProperty("accuracy") accuracy: AccuracyParam + ) extends ConfigParam { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DqType.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DqType.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DqType.scala new file mode 100644 index 0000000..c524614 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DqType.scala @@ -0,0 +1,11 @@ +package org.apache.griffin.config.params + +sealed class DqType(val tp: String) + +case object AccuracyType extends DqType("accuracy") +case object ProfileType extends DqType("profile") +case object ValidityType extends DqType("validity") + +object DqType { + def parse(tp: String): DqType = new DqType(tp) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DumpParam.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DumpParam.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DumpParam.scala new file mode 100644 index 0000000..bea1ea6 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/DumpParam.scala @@ -0,0 +1,14 @@ +package org.apache.griffin.config.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +@JsonInclude(Include.NON_NULL) +case class DumpParam( @JsonProperty("dir") dir: String, +// @JsonProperty("field.separator") fieldSep: String, +// @JsonProperty("line.separator") lineSep: String, + @JsonProperty("table.name") tableName: String, + @JsonProperty("schema") schema: List[SchemaFieldParam] + ) extends ConfigParam { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/MappingParam.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/MappingParam.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/MappingParam.scala new file mode 100644 index 0000000..80f94f4 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/MappingParam.scala @@ -0,0 +1,12 @@ +package org.apache.griffin.config.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +@JsonInclude(Include.NON_NULL) +case class MappingParam( @JsonProperty("source.name") sourceName: String, + @JsonProperty("target.name") targetName: String, + @JsonProperty("isPK") isPK: Boolean + ) extends ConfigParam { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ParseConfigParam.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ParseConfigParam.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ParseConfigParam.scala new file mode 100644 index 0000000..bf21d20 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ParseConfigParam.scala @@ -0,0 +1,10 @@ +package org.apache.griffin.config.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +@JsonInclude(Include.NON_NULL) +case class ParseConfigParam( @JsonProperty("out.schema") schema: List[SchemaFieldParam] + ) extends ConfigParam { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ParseParam.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ParseParam.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ParseParam.scala new file mode 100644 index 0000000..d7d3d22 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/ParseParam.scala @@ -0,0 +1,12 @@ +package org.apache.griffin.config.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +@JsonInclude(Include.NON_NULL) +case class ParseParam( @JsonProperty("in.type") inType: String, + @JsonProperty("out.type") outType: String, + @JsonProperty("config") configParam: ParseConfigParam + ) extends ConfigParam { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/PrepParam.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/PrepParam.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/PrepParam.scala new file mode 100644 index 0000000..d8a186d --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/PrepParam.scala @@ -0,0 +1,11 @@ +package org.apache.griffin.config.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +@JsonInclude(Include.NON_NULL) +case class PrepParam( @JsonProperty("sample") sampleParam: SampleParam, + @JsonProperty("parse") parseParam: ParseParam + ) extends ConfigParam { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/RecorderParam.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/RecorderParam.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/RecorderParam.scala new file mode 100644 index 0000000..e407a21 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/RecorderParam.scala @@ -0,0 +1,12 @@ +package org.apache.griffin.config.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +@JsonInclude(Include.NON_NULL) +case class RecorderParam( @JsonProperty("types") types: List[String], + @JsonProperty("metric.name") metricName: String, + @JsonProperty("config") config: Map[String, String] + ) extends ConfigParam { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/RetryParam.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/RetryParam.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/RetryParam.scala new file mode 100644 index 0000000..46ebe07 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/RetryParam.scala @@ -0,0 +1,12 @@ +package org.apache.griffin.config.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +@JsonInclude(Include.NON_NULL) +case class RetryParam( @JsonProperty("need.retry") needRetry: Boolean, + @JsonProperty("next.retry.seconds") nextRetryInterval: Long, + @JsonProperty("interval.seconds") interval: Long + ) extends ConfigParam { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SampleParam.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SampleParam.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SampleParam.scala new file mode 100644 index 0000000..3749702 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SampleParam.scala @@ -0,0 +1,11 @@ +package org.apache.griffin.config.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +@JsonInclude(Include.NON_NULL) +case class SampleParam( @JsonProperty("need.sample") needSample: Boolean, + @JsonProperty("ratio") ratio: Double + ) extends ConfigParam { + +}
