Repository: incubator-griffin-site Updated Branches: refs/heads/asf-site ff8b8fb3d -> 0d4135a55
http://git-wip-us.apache.org/repos/asf/incubator-griffin-site/blob/0d4135a5/data/gen-hive-data.sh ---------------------------------------------------------------------- diff --git a/data/gen-hive-data.sh b/data/gen-hive-data.sh deleted file mode 100755 index 6496eac..0000000 --- a/data/gen-hive-data.sh +++ /dev/null @@ -1,54 +0,0 @@ -#!/bin/bash - -#create table -hive -f create-table.hql -echo "create table done" - -#current hour -./gen_demo_data.sh -cur_date=`date +%Y%m%d%H` -dt=${cur_date:0:8} -hour=${cur_date:8:2} -partition_date="dt='$dt',hour='$hour'" -sed s/PARTITION_DATE/$partition_date/ ./insert-data.hql.template > insert-data.hql -hive -f insert-data.hql -src_done_path=/griffin/data/batch/demo_src/dt=${dt}/hour=${hour}/_DONE -tgt_done_path=/griffin/data/batch/demo_tgt/dt=${dt}/hour=${hour}/_DONE -hadoop fs -touchz ${src_done_path} -hadoop fs -touchz ${tgt_done_path} -echo "insert data [$partition_date] done" - -#last hour -./gen_demo_data.sh -cur_date=`date -d '1 hour ago' +%Y%m%d%H` -dt=${cur_date:0:8} -hour=${cur_date:8:2} -partition_date="dt='$dt',hour='$hour'" -sed s/PARTITION_DATE/$partition_date/ ./insert-data.hql.template > insert-data.hql -hive -f insert-data.hql -src_done_path=/griffin/data/batch/demo_src/dt=${dt}/hour=${hour}/_DONE -tgt_done_path=/griffin/data/batch/demo_tgt/dt=${dt}/hour=${hour}/_DONE -hadoop fs -touchz ${src_done_path} -hadoop fs -touchz ${tgt_done_path} -echo "insert data [$partition_date] done" - -#next hours -set +e -while true -do - ./gen_demo_data.sh - cur_date=`date +%Y%m%d%H` - next_date=`date -d "+1hour" '+%Y%m%d%H'` - dt=${next_date:0:8} - hour=${next_date:8:2} - partition_date="dt='$dt',hour='$hour'" - sed s/PARTITION_DATE/$partition_date/ ./insert-data.hql.template > insert-data.hql - hive -f insert-data.hql - src_done_path=/griffin/data/batch/demo_src/dt=${dt}/hour=${hour}/_DONE - tgt_done_path=/griffin/data/batch/demo_tgt/dt=${dt}/hour=${hour}/_DONE - hadoop fs -touchz ${src_done_path} - hadoop fs -touchz ${tgt_done_path} - echo "insert data [$partition_date] done" - sleep 3600 -done -set -e http://git-wip-us.apache.org/repos/asf/incubator-griffin-site/blob/0d4135a5/data/gen_delta_src.sh ---------------------------------------------------------------------- diff --git a/data/gen_delta_src.sh b/data/gen_delta_src.sh deleted file mode 100755 index aeda259..0000000 --- a/data/gen_delta_src.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/bin/bash - -file=delta_src -id=124 - -rm ${file} - -for i in {1..1000} -do - idx=`shuf -i1-2000 -n1` - echo "${id}|${idx}|${idx}" >> ${file} -done - http://git-wip-us.apache.org/repos/asf/incubator-griffin-site/blob/0d4135a5/data/gen_demo_data.sh ---------------------------------------------------------------------- diff --git a/data/gen_demo_data.sh b/data/gen_demo_data.sh deleted file mode 100755 index d85f306..0000000 --- a/data/gen_demo_data.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash - -./gen_delta_src.sh - -src=demo_src -tgt=demo_tgt - -rm ${src} -cat demo_basic >> ${src} -cat delta_src >> ${src} - -rm ${tgt} -cat demo_basic >> ${tgt} -cat delta_tgt >> ${tgt} http://git-wip-us.apache.org/repos/asf/incubator-griffin-site/blob/0d4135a5/data/insert-data.hql.template ---------------------------------------------------------------------- diff --git a/data/insert-data.hql.template b/data/insert-data.hql.template deleted file mode 100644 index 4e4039a..0000000 --- a/data/insert-data.hql.template +++ /dev/null @@ -1,2 +0,0 @@ -LOAD DATA LOCAL INPATH 'demo_src' INTO TABLE demo_src PARTITION (PARTITION_DATE); -LOAD DATA LOCAL INPATH 'demo_tgt' INTO TABLE demo_tgt PARTITION (PARTITION_DATE); http://git-wip-us.apache.org/repos/asf/incubator-griffin-site/blob/0d4135a5/data/streaming/gen-data.sh ---------------------------------------------------------------------- diff --git a/data/streaming/gen-data.sh b/data/streaming/gen-data.sh new file mode 100755 index 0000000..e2ad8bd --- /dev/null +++ b/data/streaming/gen-data.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +#current time +cur_time=`date +%Y-%m-%d_%H:%M:%S` +sed s/TIME/$cur_time/ ./source.temp > source.tp +sed s/TIME/$cur_time/ ./target.temp > target.tp + +#create data +for row in 1 2 3 4 5 6 7 8 9 10 +do + sed -n "${row}p" < source.tp > sline + cnt=`shuf -i1-2 -n1` + clr="red" + if [ $cnt == 2 ]; then clr="yellow"; fi + sed s/COLOR/$clr/ sline >> source.data +done +rm sline + +cat target.tp > target.data + +rm source.tp target.tp + +#import data +kafka-console-producer.sh --broker-list localhost:9092 --topic source --new-producer < source.data +kafka-console-producer.sh --broker-list localhost:9092 --topic target --new-producer < target.data + +rm source.data target.data + +echo "insert data at ${cur_time}" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin-site/blob/0d4135a5/data/streaming/source.temp ---------------------------------------------------------------------- diff --git a/data/streaming/source.temp b/data/streaming/source.temp new file mode 100644 index 0000000..91ff6be --- /dev/null +++ b/data/streaming/source.temp @@ -0,0 +1,10 @@ +{"id": 1, "name": "Apple", "color": "COLOR", "time": "TIME"} +{"id": 2, "name": "Banana", "color": "COLOR", "time": "TIME"} +{"id": 3, "name": "Cherry", "color": "COLOR", "time": "TIME"} +{"id": 4, "name": "Durian", "color": "COLOR", "time": "TIME"} +{"id": 5, "name": "Lichee", "color": "COLOR", "time": "TIME"} +{"id": 6, "name": "Peach", "color": "COLOR", "time": "TIME"} +{"id": 7, "name": "Papaya", "color": "COLOR", "time": "TIME"} +{"id": 8, "name": "Lemon", "color": "COLOR", "time": "TIME"} +{"id": 9, "name": "Mango", "color": "COLOR", "time": "TIME"} +{"id": 10, "name": "Pitaya", "color": "COLOR", "time": "TIME"} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin-site/blob/0d4135a5/data/streaming/streaming-data.sh ---------------------------------------------------------------------- diff --git a/data/streaming/streaming-data.sh b/data/streaming/streaming-data.sh new file mode 100755 index 0000000..a52323c --- /dev/null +++ b/data/streaming/streaming-data.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +#create topics +kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic source +kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic target + +#every minute +set +e +while true +do + ./gen-data.sh + sleep 60 +done +set -e \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin-site/blob/0d4135a5/data/streaming/target.temp ---------------------------------------------------------------------- diff --git a/data/streaming/target.temp b/data/streaming/target.temp new file mode 100644 index 0000000..b744300 --- /dev/null +++ b/data/streaming/target.temp @@ -0,0 +1,10 @@ +{"id": 1, "name": "Apple", "color": "red", "time": "TIME"} +{"id": 2, "name": "Banana", "color": "yellow", "time": "TIME"} +{"id": 3, "name": "Cherry", "color": "red", "time": "TIME"} +{"id": 4, "name": "Durian", "color": "yellow", "time": "TIME"} +{"id": 5, "name": "Lichee", "color": "red", "time": "TIME"} +{"id": 6, "name": "Peach", "color": "red", "time": "TIME"} +{"id": 7, "name": "Papaya", "color": "yellow", "time": "TIME"} +{"id": 8, "name": "Lemon", "color": "yellow", "time": "TIME"} +{"id": 9, "name": "Mango", "color": "yellow", "time": "TIME"} +{"id": 10, "name": "Pitaya", "color": "red", "time": "TIME"} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin-site/blob/0d4135a5/docs/quickstart.html ---------------------------------------------------------------------- diff --git a/docs/quickstart.html b/docs/quickstart.html index e69999b..bb40d78 100644 --- a/docs/quickstart.html +++ b/docs/quickstart.html @@ -208,7 +208,7 @@ LOCATION ... </code></pre></div></div> <p>For demo_src and demo_tgt, there could be some different items between each other. -You can download <a href="/data">demo data</a> and execute <code class="highlighter-rouge">./gen_demo_data.sh</code> to get the two data source files. +You can download <a href="/data/batch">demo data</a> and execute <code class="highlighter-rouge">./gen_demo_data.sh</code> to get the two data source files. Then we will load data into both two tables for every hour.</p> <div class="highlighter-rouge"><div class="highlight"><pre class="highlight"><code>LOAD DATA LOCAL INPATH 'demo_src' INTO TABLE demo_src PARTITION (dt='20180912',hour='09'); LOAD DATA LOCAL INPATH 'demo_tgt' INTO TABLE demo_tgt PARTITION (dt='20180912',hour='09'); @@ -316,7 +316,7 @@ LOAD DATA LOCAL INPATH 'demo_tgt' INTO TABLE demo_tgt PARTITION (dt='20180912',h <div class="highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default \ --driver-memory 1g --executor-memory 1g --num-executors 2 \ <path>/griffin-measure.jar \ -<path>/env.json <path>/batch-accu-config.json +<path>/env.json <path>/dq.json </code></pre></div></div> <h2 id="report-data-quality-metrics">Report data quality metrics</h2> http://git-wip-us.apache.org/repos/asf/incubator-griffin-site/blob/0d4135a5/docs/usecases.html ---------------------------------------------------------------------- diff --git a/docs/usecases.html b/docs/usecases.html index a07cdfc..8231246 100644 --- a/docs/usecases.html +++ b/docs/usecases.html @@ -127,7 +127,240 @@ under the License. <div class="col-xs-6 col-sm-9 page-main-content" style="margin-left: -15px" id="loadcontent"> <h1 class="page-header" style="margin-top: 0px">Streaming Use Cases</h1> <h2 id="user-story">User Story</h2> -<p>Say we have two streaming data set(topic_src, topic_tgt), we need to know what is the data quality for target data set, based on source data set.</p> +<p>Say we have two streaming data sets in different kafka topics(source, target), we need to know what is the data quality for target data set, based on source data set.</p> + +<p>For simplicity, suppose both two topicsâ data are json string which would be like this:</p> +<div class="highlighter-rouge"><div class="highlight"><pre class="highlight"><code>{"id": 1, "name": "Apple", "color": "red", "time": "2018-09-12_06:00:00"} +{"id": 2, "name": "Banana", "color": "yellow", "time": "2018-09-12_06:01:00"} +... +</code></pre></div></div> + +<h2 id="environment-preparation">Environment Preparation</h2> +<p>You need to prepare the environment for Apache Griffin measure module, including the following software:</p> +<ul> + <li>JDK (1.8+)</li> + <li>Hadoop (2.6.0+)</li> + <li>Spark (2.2.1+)</li> + <li>Kafka (0.8.x)</li> + <li>Zookeeper (3.5+)</li> +</ul> + +<h2 id="build-griffin-measure-module">Build Griffin Measure Module</h2> +<ol> + <li>Download Griffin source package <a href="https://www.apache.org/dist/incubator/griffin/0.3.0-incubating">here</a>.</li> + <li>Unzip the source package. + <div class="highlighter-rouge"><div class="highlight"><pre class="highlight"><code>unzip griffin-0.3.0-incubating-source-release.zip +cd griffin-0.3.0-incubating-source-release +</code></pre></div> </div> + </li> + <li>Build Griffin jars. + <div class="highlighter-rouge"><div class="highlight"><pre class="highlight"><code>mvn clean install +</code></pre></div> </div> + + <p>Move the built griffin measure jar to your work path.</p> + + <div class="highlighter-rouge"><div class="highlight"><pre class="highlight"><code>mv measure/target/measure-0.3.0-incubating.jar <work path>/griffin-measure.jar +</code></pre></div> </div> + </li> +</ol> + +<h2 id="data-preparation">Data Preparation</h2> + +<p>For our quick start, We will create two kafka topics(source, target) and generate data in json string format for them minutely.</p> +<div class="highlighter-rouge"><div class="highlight"><pre class="highlight"><code># create topics +# Note: it just works for kafka 0.8 +kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic source +kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic target +</code></pre></div></div> +<p>The data would be generated like this:</p> +<div class="highlighter-rouge"><div class="highlight"><pre class="highlight"><code>{"id": 1, "name": "Apple", "color": "red", "time": "2018-09-12_06:00:00"} +{"id": 2, "name": "Banana", "color": "yellow", "time": "2018-09-12_06:01:00"} +</code></pre></div></div> +<p>For topic source and target, there could be some different items between each other. +You can download <a href="/data/streaming">demo data</a> and execute <code class="highlighter-rouge">./streaming-data.sh</code> to generate json string data file and produce them into kafka topics minutely.</p> + +<h2 id="define-data-quality-measure">Define data quality measure</h2> + +<h4 id="griffin-env-configuration">Griffin env configuration</h4> +<p>The environment config file: env.json</p> +<div class="highlighter-rouge"><div class="highlight"><pre class="highlight"><code>{ + "spark": { + "log.level": "WARN", + "checkpoint.dir": "hdfs:///griffin/checkpoint", + "batch.interval": "20s", + "process.interval": "1m", + "init.clear": true, + "config": { + "spark.default.parallelism": 4, + "spark.task.maxFailures": 5, + "spark.streaming.kafkaMaxRatePerPartition": 1000, + "spark.streaming.concurrentJobs": 4, + "spark.yarn.maxAppAttempts": 5, + "spark.yarn.am.attemptFailuresValidityInterval": "1h", + "spark.yarn.max.executor.failures": 120, + "spark.yarn.executor.failuresValidityInterval": "1h", + "spark.hadoop.fs.hdfs.impl.disable.cache": true + } + }, + "sinks": [ + { + "type": "console" + }, + { + "type": "hdfs", + "config": { + "path": "hdfs:///griffin/persist" + } + }, + { + "type": "elasticsearch", + "config": { + "method": "post", + "api": "http://es:9200/griffin/accuracy" + } + } + ], + "griffin.checkpoint": [ + { + "type": "zk", + "config": { + "hosts": "zk:2181", + "namespace": "griffin/infocache", + "lock.path": "lock", + "mode": "persist", + "init.clear": true, + "close.clear": false + } + } + ] +} +</code></pre></div></div> + +<h4 id="define-griffin-data-quality">Define griffin data quality</h4> +<p>The DQ config file: dq.json</p> + +<div class="highlighter-rouge"><div class="highlight"><pre class="highlight"><code>{ + "name": "streaming_accu", + "process.type": "streaming", + "data.sources": [ + { + "name": "src", + "baseline": true, + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "kafka:9092", + "group.id": "griffin", + "auto.offset.reset": "largest", + "auto.commit.enable": "false" + }, + "topics": "source", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "rule": "from_json" + } + ] + } + ], + "checkpoint": { + "type": "json", + "file.path": "hdfs:///griffin/streaming/dump/source", + "info.path": "source", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["-5m", "0"], + "updatable": true + } + }, { + "name": "tgt", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "kafka:9092", + "group.id": "griffin", + "auto.offset.reset": "largest", + "auto.commit.enable": "false" + }, + "topics": "target", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "rule": "from_json" + } + ] + } + ], + "checkpoint": { + "type": "json", + "file.path": "hdfs:///griffin/streaming/dump/target", + "info.path": "target", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["-1m", "0"] + } + } + ], + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "accuracy", + "out.dataframe.name": "accu", + "rule": "src.id = tgt.id AND src.name = tgt.name AND src.color = tgt.color AND src.time = tgt.time", + "details": { + "source": "src", + "target": "tgt", + "miss": "miss_count", + "total": "total_count", + "matched": "matched_count" + }, + "out":[ + { + "type":"metric", + "name": "accu" + }, + { + "type":"record", + "name": "missRecords" + } + ] + } + ] + }, + "sinks": ["CONSOLE", "HDFS"] +} +</code></pre></div></div> + +<h2 id="measure-data-quality">Measure data quality</h2> +<p>Submit the measure job to Spark, with config file paths as parameters.</p> + +<div class="highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default \ +--driver-memory 1g --executor-memory 1g --num-executors 3 \ +<path>/griffin-measure.jar \ +<path>/env.json <path>/dq.json +</code></pre></div></div> + +<h2 id="report-data-quality-metrics">Report data quality metrics</h2> +<p>Then you can get the calculation log in console, when the job runs, you can get the result metrics printed minutely. The related results will also be saved in hdfs: <code class="highlighter-rouge">hdfs:///griffin/persist/<job name>/</code>, listing in different directoies named by calculate timestamps.</p> + +<h2 id="refine-data-quality-report">Refine Data Quality report</h2> +<p>Depends on your business, you might need to refine your data quality measure further till your are satisfied.</p> + +<h2 id="more-details">More Details</h2> +<p>For more details about griffin measures, you can visit our documents in <a href="https://github.com/apache/incubator-griffin/tree/master/griffin-doc">github</a>.</p> </div><!--end of loadcontent-->