The second half part of this proposal is not finished yet, but I'd like to
send the draft out for discussion.

Thanks,
Baoqi Wu



Based on previous work by @chongchongzi, mainly the following issue/pull
request:
  - issue 1306: [FEATURE]Data quality inspection component(数据质量检测组件)
https://github.com/apache/incubator-dolphinscheduler/issues/1306
  - PR 1396:
https://github.com/apache/incubator-dolphinscheduler/pull/1396/files

I propose the following idea about how to implement this "Data Quality
Check" Task component.

** 1. Goal and Non-Goal for this "Data Quality Check" Task Component

Since DolphinScheduler's main business is focus on Workflow Orchestration
(Task Schedule), this component is definitely NOT to compete with other
full fledged Data Quality Check products, like Apache Griffin (
https://github.com/apache/griffin), or WeBank's Qualitis (
https://github.com/WeBankFinTech/Qualitis).

But "Data Quality" is so important during Data Processing's lifetime, if we
provide some basic, but generic support for basic "Data Quality Check", it
will benefit the casual Data Quality Check users.

Goal:
  - This "Data Quality Check" is a new Task Type, and its functionality
should be kept as isolated as possible
  - It should be generic enough, to make it suitable for different use
cases. and run against different RDBMS
  - It can solve real world Data Quality Check scenarios
  - The solution should not be very hard to implement, should not conflict
with DolphinScheduler's future development direction

Non-Goal:
  - Full featured Data Quality Check product
  - Good to use configuration GUI, even for Non-Tech people
  - Centralized "Data Quality Rule Gallery/Repository" is not supported.
User need to craft the rule/SQL/check expressions inside the Task node
directly

** 2. Basic Concept for "Data Quality Check" Task Component
This component will be mainly based on SQL Task. So, in this component, it
will need to first select a Data Source, we will run the following checks
against that Data Source (or that RDBMS)

This component will be divided into the following 3 steps:
  1. Measure
  2. Check
  3. Pass

*** 2.1. Measure
The first step is to fetch selected data from Database, measures fetched
from database will used in "Step2 Check".

  - NOTE: the result of each check should be exact one ROW, although in
this ONLY row, it can optionally contains several columns.


The GUI for this "Measure" part should be like:


| Measure ID | Measure Name            | Description | SQL
                                              | RESULT_TYPE  |
|------------+-------------------------+-------------+-------------------------------------------------------------------+--------------|
|          0 | Null Count              | check null  | select
sum(cast((`col1` is null) as int)) from table1             | SINGLE_VALUE |
|          1 | count vs count distinct | data uniq   | select count(order)
as c, count(distinct order) as cd from table2 | LIST         |
|          2 | multiple values         |             | select count(order)
as c, count(distinct order) as cd from table2 | MAP          |


After run the measure one by one, the measure result values will be:
  - measures["Null Count"] = 100
  - measures["count vs count distinct"] = [100, 10]
  - measures["multiple values"] = {"c":100, "cd":10}

Other common scenarios:
  - check whether 2 columns are equal, (translate to "get the total count
for those records, which 2 columns are not equal": (The following SQL can
work in Spark, but for other database, the cast as int trick may not work)
    - SELECT sum(cast((`col1` <> `col2`) as int)) from table3

*** 2.2. Check
Once all measures are ready, we can define some Check rules to validate.

NOTE: the Check rule's result should be BOOLEAN

The GUI for this "Check" step should be like:

| Check ID | Check Name                | Description | Expression
                                                  |
|----------+---------------------------+-------------+------------------------------------------------------------------------|
|        0 | no null value             | long text   | measures["Null
Count"] == 0                                            |
|        1 | count bigger than 50      | long text   | measures["count vs
count distinct"][0] > 50                            |
|        2 | count > 50 in another way | long text   | measures["multiple
values"]["c"] > 50                                  |
|        3 | all order is unique       | long text   | measures["multiple
values"]["c"] ==  measures["multiple values"]["cd"] |

NOTE:
  - the syntax is like Java, not SQL
  - those check is not performed against Database, but executed in JVM's
memory

The result of each check will be:
  - checks["no null value"] = false
  - checks["count bigger than 50"] = true
  - checks["count > 50 in another way"] = true
  - checks["all order is unique"] = false


In this step, We need to implement/use a expression language, like: Apache
commons-el. So that we can evaluate simple expressions. NOTE: why we
introduce some Expression Language, because this EL support will not ONLY
benefit for Data Quality Check, but later we can use it in several other
places:
  - Custom Variable manipulation. For example, currently the custom
variable (自定义参数) is very limited, but later if we support EL, we can
construct those variable values more easily
  - Later, if we want to support "IF-ELSE" Task Component, this EL will be
benefit
  - user can extend EL for more User Defined Functions
  - more functions are possible:
    - get last 30 days' measure result for current Data Quality Check node:
      get_previous_measure_values("Null Count", 30)
    - get last 20 measure result for other Data Quality Check node in the
same process definition:
get_previous_measure_values_for_other_task("data_check_for_another_db",
"Null Count", 20)

There are several EL-like implementations, like:
  - Apache Commons EL
  - Apache Commons JEXL
  - MVEL

Based on limited knowledge, I suggest to use Commons-EL library, although
the latest release for this library is: 2003-06-18..  But the functionality
for this library seems the most limited (Limited functionality means more
secure when embed), and StreamSets Datacollector also use commons-el and
implemented very rich functionality for its expression evaluation system (
https://github.com/streamsets/datacollector/blob/master/container-common/src/main/java/com/streamsets/datacollector/el/ELEvaluator.java#L147
)


But Apache Commons JEXL seems a good competitor, JEXL is more modern, but
it have rich functionalities, so, it is easy to make security problem, like
 "new java.io.File('file').delete()". If we think JEXL has some more
advantage above Commons-EL, we need to implement some sandbox feature for
JEXL to make it securer.  Like this one :
https://code.onehippo.org/cms-community/hippo-cms/commit/7f6374a89a5a2d15edebc7340f1539bc00fbdb82?view=parallel
  <<CMS-10872: Apply sandbox in JexlEngine to be safe>>


While MVEL has the largest functionalities, even it can define functions,
so it may harder to make it secure

*** 2.3. Pass
The final "Pass" step defined whether this Task Instance will marked as
Success or Failure. It is GUI is like:

The Task will be considered as Success when:
  - [x] Always   (this is default, it will always success)
  - [ ] When All check pass
  - [ ] When Any check pass
  - [ ] When the following check pass:  { select the check rules }
  - [ ] when the count of pass is bigger than x   (x is a number)

** 3. The storage for "Data Quality Check" Result
We need to provide a way to store the result for measure, check and pass.
First step, we may save those as a JSON, and as a column value to the "Task
Instance" table. like:

{
  "measure": {
     "Null Count": 100,
     "count vs count distinct": [100, 10],
     "multiple values": {"c":100, "cd":10}
  },
  "check": {
     "no null value": false,
     "count bigger than 50": true,
     "count > 50 in another way": true,
     "all order is unique": false
  },
  "pass": true
}


By save all those result, we can implement more advance features: (like
mentioned previously)
  - get_previous_measure_values("Null Count", 30)
  - get_previous_measure_values_for_other_task("data_check_for_another_db",
"Null Count", 20)

Also, even in the same Process Instance, there is one "Data Quality Check"
node against MySQL, and save the total row count as a measure, while add
another "Data Quality Check" node against Hive, and in the later node, it
can compare whether the measure from those 2 nodes are same.


*** 4. Reuse the same Storage format with Task dropwizard "Metric" support

The JSON storage used by "Data Quality Check" node is very general, don't
need to limit it only for Data Quality Check task.


For Task's Metric Support, for example, it will record the following
information:

{
   "metric": {
      "rowsCounter": 1000,
      "timeElapsed": 60000,
      "histo": [12, 13, 14],
   },
   "customMetrics": {
      "sqlRowInserted": 2000,
      "currentTimer": 1000
   }
}

It can record some basic task summary metrics, like how long it last, how
many records it read, and also, when we finish Plugin support, each Plugin
can define and save those Metrics

NOTE: this metric JSON can be combined with previous measure_check_pass JSON

*** 5. Auto Chart Plots for those Measures / Check / Pass/ Metric

For each Task Instance level, we can plot some charts based on the Measure
type , or based on Dropwizard Metric type. For example, by Vega.lite (
https://vega.github.io/vega-lite/) Json to plot.



Also, since we also saved previous history data, we can plot the trend line
for those metrics, user can get chart for previous 30 times "Data Quality
Check" to decide whether the Data Quality is improving or not.

So, we can use same logic to:
  - get Data Quality Report
  - get Metrics Report to know current Task's running stats

*** Don't worry about the full functionality, we can implement them
milestone by milestone

Reply via email to