The second half part of this proposal is not finished yet, but I'd like to send the draft out for discussion.
Thanks, Baoqi Wu Based on previous work by @chongchongzi, mainly the following issue/pull request: - issue 1306: [FEATURE]Data quality inspection component(数据质量检测组件) https://github.com/apache/incubator-dolphinscheduler/issues/1306 - PR 1396: https://github.com/apache/incubator-dolphinscheduler/pull/1396/files I propose the following idea about how to implement this "Data Quality Check" Task component. ** 1. Goal and Non-Goal for this "Data Quality Check" Task Component Since DolphinScheduler's main business is focus on Workflow Orchestration (Task Schedule), this component is definitely NOT to compete with other full fledged Data Quality Check products, like Apache Griffin ( https://github.com/apache/griffin), or WeBank's Qualitis ( https://github.com/WeBankFinTech/Qualitis). But "Data Quality" is so important during Data Processing's lifetime, if we provide some basic, but generic support for basic "Data Quality Check", it will benefit the casual Data Quality Check users. Goal: - This "Data Quality Check" is a new Task Type, and its functionality should be kept as isolated as possible - It should be generic enough, to make it suitable for different use cases. and run against different RDBMS - It can solve real world Data Quality Check scenarios - The solution should not be very hard to implement, should not conflict with DolphinScheduler's future development direction Non-Goal: - Full featured Data Quality Check product - Good to use configuration GUI, even for Non-Tech people - Centralized "Data Quality Rule Gallery/Repository" is not supported. User need to craft the rule/SQL/check expressions inside the Task node directly ** 2. Basic Concept for "Data Quality Check" Task Component This component will be mainly based on SQL Task. So, in this component, it will need to first select a Data Source, we will run the following checks against that Data Source (or that RDBMS) This component will be divided into the following 3 steps: 1. Measure 2. Check 3. Pass *** 2.1. Measure The first step is to fetch selected data from Database, measures fetched from database will used in "Step2 Check". - NOTE: the result of each check should be exact one ROW, although in this ONLY row, it can optionally contains several columns. The GUI for this "Measure" part should be like: | Measure ID | Measure Name | Description | SQL | RESULT_TYPE | |------------+-------------------------+-------------+-------------------------------------------------------------------+--------------| | 0 | Null Count | check null | select sum(cast((`col1` is null) as int)) from table1 | SINGLE_VALUE | | 1 | count vs count distinct | data uniq | select count(order) as c, count(distinct order) as cd from table2 | LIST | | 2 | multiple values | | select count(order) as c, count(distinct order) as cd from table2 | MAP | After run the measure one by one, the measure result values will be: - measures["Null Count"] = 100 - measures["count vs count distinct"] = [100, 10] - measures["multiple values"] = {"c":100, "cd":10} Other common scenarios: - check whether 2 columns are equal, (translate to "get the total count for those records, which 2 columns are not equal": (The following SQL can work in Spark, but for other database, the cast as int trick may not work) - SELECT sum(cast((`col1` <> `col2`) as int)) from table3 *** 2.2. Check Once all measures are ready, we can define some Check rules to validate. NOTE: the Check rule's result should be BOOLEAN The GUI for this "Check" step should be like: | Check ID | Check Name | Description | Expression | |----------+---------------------------+-------------+------------------------------------------------------------------------| | 0 | no null value | long text | measures["Null Count"] == 0 | | 1 | count bigger than 50 | long text | measures["count vs count distinct"][0] > 50 | | 2 | count > 50 in another way | long text | measures["multiple values"]["c"] > 50 | | 3 | all order is unique | long text | measures["multiple values"]["c"] == measures["multiple values"]["cd"] | NOTE: - the syntax is like Java, not SQL - those check is not performed against Database, but executed in JVM's memory The result of each check will be: - checks["no null value"] = false - checks["count bigger than 50"] = true - checks["count > 50 in another way"] = true - checks["all order is unique"] = false In this step, We need to implement/use a expression language, like: Apache commons-el. So that we can evaluate simple expressions. NOTE: why we introduce some Expression Language, because this EL support will not ONLY benefit for Data Quality Check, but later we can use it in several other places: - Custom Variable manipulation. For example, currently the custom variable (自定义参数) is very limited, but later if we support EL, we can construct those variable values more easily - Later, if we want to support "IF-ELSE" Task Component, this EL will be benefit - user can extend EL for more User Defined Functions - more functions are possible: - get last 30 days' measure result for current Data Quality Check node: get_previous_measure_values("Null Count", 30) - get last 20 measure result for other Data Quality Check node in the same process definition: get_previous_measure_values_for_other_task("data_check_for_another_db", "Null Count", 20) There are several EL-like implementations, like: - Apache Commons EL - Apache Commons JEXL - MVEL Based on limited knowledge, I suggest to use Commons-EL library, although the latest release for this library is: 2003-06-18.. But the functionality for this library seems the most limited (Limited functionality means more secure when embed), and StreamSets Datacollector also use commons-el and implemented very rich functionality for its expression evaluation system ( https://github.com/streamsets/datacollector/blob/master/container-common/src/main/java/com/streamsets/datacollector/el/ELEvaluator.java#L147 ) But Apache Commons JEXL seems a good competitor, JEXL is more modern, but it have rich functionalities, so, it is easy to make security problem, like "new java.io.File('file').delete()". If we think JEXL has some more advantage above Commons-EL, we need to implement some sandbox feature for JEXL to make it securer. Like this one : https://code.onehippo.org/cms-community/hippo-cms/commit/7f6374a89a5a2d15edebc7340f1539bc00fbdb82?view=parallel <<CMS-10872: Apply sandbox in JexlEngine to be safe>> While MVEL has the largest functionalities, even it can define functions, so it may harder to make it secure *** 2.3. Pass The final "Pass" step defined whether this Task Instance will marked as Success or Failure. It is GUI is like: The Task will be considered as Success when: - [x] Always (this is default, it will always success) - [ ] When All check pass - [ ] When Any check pass - [ ] When the following check pass: { select the check rules } - [ ] when the count of pass is bigger than x (x is a number) ** 3. The storage for "Data Quality Check" Result We need to provide a way to store the result for measure, check and pass. First step, we may save those as a JSON, and as a column value to the "Task Instance" table. like: { "measure": { "Null Count": 100, "count vs count distinct": [100, 10], "multiple values": {"c":100, "cd":10} }, "check": { "no null value": false, "count bigger than 50": true, "count > 50 in another way": true, "all order is unique": false }, "pass": true } By save all those result, we can implement more advance features: (like mentioned previously) - get_previous_measure_values("Null Count", 30) - get_previous_measure_values_for_other_task("data_check_for_another_db", "Null Count", 20) Also, even in the same Process Instance, there is one "Data Quality Check" node against MySQL, and save the total row count as a measure, while add another "Data Quality Check" node against Hive, and in the later node, it can compare whether the measure from those 2 nodes are same. *** 4. Reuse the same Storage format with Task dropwizard "Metric" support The JSON storage used by "Data Quality Check" node is very general, don't need to limit it only for Data Quality Check task. For Task's Metric Support, for example, it will record the following information: { "metric": { "rowsCounter": 1000, "timeElapsed": 60000, "histo": [12, 13, 14], }, "customMetrics": { "sqlRowInserted": 2000, "currentTimer": 1000 } } It can record some basic task summary metrics, like how long it last, how many records it read, and also, when we finish Plugin support, each Plugin can define and save those Metrics NOTE: this metric JSON can be combined with previous measure_check_pass JSON *** 5. Auto Chart Plots for those Measures / Check / Pass/ Metric For each Task Instance level, we can plot some charts based on the Measure type , or based on Dropwizard Metric type. For example, by Vega.lite ( https://vega.github.io/vega-lite/) Json to plot. Also, since we also saved previous history data, we can plot the trend line for those metrics, user can get chart for previous 30 times "Data Quality Check" to decide whether the Data Quality is improving or not. So, we can use same logic to: - get Data Quality Report - get Metrics Report to know current Task's running stats *** Don't worry about the full functionality, we can implement them milestone by milestone
