This is an automated email from the ASF dual-hosted git repository.
wankun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new fa11527 [GRIFFIN-319] Deprecate old Data Connectors
fa11527 is described below
commit fa11527ea9a694c7d778d3ba2ea007db3dbf21a8
Author: chitralverma <[email protected]>
AuthorDate: Mon Jan 6 17:12:03 2020 +0800
[GRIFFIN-319] Deprecate old Data Connectors
**What changes were proposed in this pull request?**
This ticket aims to inform users of the deprecated data source connectors.
Deprecated connectors:
- MySqlDataConnector in favour of JDBCBasedDataConnector
- AvroBatchDataConnector in favour of FileBasedDataConnector
- TextDirBatchDataConnector in favour of FileBasedDataConnector
The documentation is also updated corresponding to the new connectors for
reference.
**Does this PR introduce any user-facing change?**
No
**How was this patch tested?**
Not Applicable
Author: chitralverma <[email protected]>
Closes #564 from chitralverma/deprecate-old-data-connectors.
---
griffin-doc/measure/measure-configuration-guide.md | 161 +++++++++++++++++----
.../connector/DataConnectorFactory.scala | 9 +-
.../connector/batch/AvroBatchDataConnector.scala | 3 +
.../connector/batch/FileBasedDataConnector.scala | 2 +-
.../connector/batch/MySqlDataConnector.scala | 2 +-
.../batch/TextDirBatchDataConnector.scala | 3 +
6 files changed, 145 insertions(+), 35 deletions(-)
diff --git a/griffin-doc/measure/measure-configuration-guide.md
b/griffin-doc/measure/measure-configuration-guide.md
index ac7b5c2..d1a6c36 100644
--- a/griffin-doc/measure/measure-configuration-guide.md
+++ b/griffin-doc/measure/measure-configuration-guide.md
@@ -82,7 +82,7 @@ Above lists environment parameters.
- **sinks**: This field configures list of metrics sink parameters, multiple
sink ways are supported. Details of sink configuration [here](#sinks).
- **griffin.checkpoint**: This field configures list of griffin checkpoint
parameters, multiple cache ways are supported. It is only for streaming dq
case. Details of info cache configuration [here](#griffin-checkpoint).
-### <a name="sinks"></a>Sinks
+### Sinks
- **type**: Metrics and records sink type, "console", "hdfs", "http", "mongo",
"custom".
- **config**: Configure parameters of each sink type.
+ console sink (aliases: "log")
@@ -105,7 +105,7 @@ Above lists environment parameters.
User-provided data sink should be present in Spark job's class path,
by providing custom jar as -jar parameter
to spark-submit or by adding to "jars" list in
sparkProperties.json.
-### <a name="griffin-checkpoint"></a>Griffin Checkpoint
+### Griffin Checkpoint
- **type**: Griffin checkpoint type, "zk" for zookeeper checkpoint.
- **config**: Configure parameters of griffin checkpoint type.
+ zookeeper checkpoint
@@ -193,33 +193,138 @@ Above lists DQ job configure parameters.
+ rules: List of rules, to define every rule step. Details of rule
configuration [here](#rule).
- **sinks**: Whitelisted sink types for this job. Note: no sinks will be used,
if empty or omitted.
-### <a name="data-connector"></a>Data Connector
-- **type**: Data connector type: "AVRO", "HIVE", "TEXT-DIR", "CUSTOM" for
batch mode; "KAFKA", "CUSTOM" for streaming mode.
-- **version**: Version string of data connector type.
-- **config**: Configure parameters of each data connector type.
- + avro data connector
- * file.path: avro file path, optional, "" as default.
- * file.name: avro file name.
- + hive data connector
- * database: data base name, optional, "default" as default.
- * table.name: table name.
- * where: where conditions string, split by ",", optional.
- e.g. `dt=20170410 AND hour=15, dt=20170411 AND hour=15,
dt=20170412 AND hour=15`
- + text dir data connector
- * dir.path: parent directory path.
- * data.dir.depth: integer, depth of data directories, 0 as
default.
- * success.file: success file name,
- * done.file:
- + custom connector
- * class: class name for user-provided data connector
implementation. For Batch
- it should be implementing BatchDataConnector trait and have static
method with signature
- ```def apply(ctx: BatchDataConnectorContext):
BatchDataConnector```.
- For Streaming, it should be implementing StreamingDataConnector and
have static method
- ```def apply(ctx: StreamingDataConnectorContext):
StreamingDataConnector```. User-provided
- data connector should be present in Spark job's class path, by
providing custom jar as -jar parameter
- to spark-submit or by adding to "jars" list in
sparkProperties.json.
+### Data Connector
-### <a name="rule"></a>Rule
+Data Connectors help connector to external sources on which DQ checks can be
applied.
+
+List of supported data connectors:
+ - Hive
+ - Kafka (Steaming only)
+ - **File based:** Parquet, Avro, ORC, CSV, TSV, Text.
+ - **JDBC based:** MySQL, PostgreSQL etc.
+ - **Custom:** Cassandra, ElasticSearch
+
+ #### Configuration
+ A sample data connector configuration is as following,
+
+ ```
+"connectors": [
+ {
+ "type": "file",
+ "version": "1.7",
+ "config": {
+ "key1": "value1",
+ "key2": "value2"
+ }
+ }
+]
+ ```
+
+ ##### Key Parameters:
+ | Name | Type | Description | Supported
Values |
+
|:--------|:---------|:---------------------------------------|:-------------------------------------------------|
+ | type | `String` | Type of the Connector | file, hive,
kafka (streaming only), jdbc, custom |
+ | version | `String` | Version String of connector (optional) | Depends on
connector type |
+ | config | `Object` | Configuration params of the connector | Depends on
connector type (see below) |
+
+ ##### For Custom Data Connectors:
+ - **config** object must contain the key **class** whose value specifies
class name for user-provided data connector
+ implementation.
+ + For **Batch** it should implement BatchDataConnector trait.
+ + For **Streaming** it should implement StreamingDataConnector trait.
+ - Example:
+ ```
+ "connectors": [
+ {
+ "type": "custom",
+ "config": {
+ "class":
"org.apache.griffin.measure.datasource.connector.batch.CassandraDataConnector",
+ ...
+ }
+ }
+ ]
+ ```
+
+ **Note:** User-provided data connector should be present in Spark job's class
path, by either providing custom jar with
+`--jars` parameter to spark-submit or by adding setting `spark.jars` in `spark
-> config` section of environment config.
+
+ ##### For File based Data Connectors:
+
+ - Currently supported formats like Parquet, ORC, AVRO, Text and Delimited
types like CSV, TSV etc.
+ - Local files can also be read by prepending `file://` namespace.
+ - **config** object supports the following keys,
+
+ | Name | Type | Description | Supported
Values | Default Values |
+
|:-----------|:---------|:---------------------------------------|:-----------------|:--------------
|
+ | format | `String` | type of file source| parquet, avro, orc, csv, tsv,
text | parquet |
+ | paths | `List` | path(s) to be read | | `Empty` |
+ | options | `Object` | format specific options | | `Empty` |
+ | skipOnError| `Boolean`| whether to continue execution if one or more paths
are invalid | true, false | false |
+ | schema | `List` | given as list of key value pairs | See example
below | `null` |
+
+ - Example:
+ ```
+ "connectors": [
+ {
+ "type": "file",
+ "config": {
+ "format": "csv",
+ "paths": ["/path/to/csv/dir/*", "/path/to/dir/test.csv"],
+ "options": {
+ "header": "true"
+ },
+ "skipOnError": "false",
+
"schema":[{"name":"user_id","type":"string","nullable":"true"},{"name":"age","type":"int","nullable":"false"}]
+ }
+ }
+ ]
+
+ **Note:** Additional examples of schema:
+-
"schema":[{"name":"user_id","type":"string","nullable":"true"},{"name":"age","type":"int","nullable":"false"}]
+- "schema":[{"name":"user_id","type":"decimal(5,2)","nullable":"true"}]
+-
"schema":[{"name":"my_struct","type":"struct<f1:int,f2:string>","nullable":"true"}]
+
+ ##### For Hive Data Connectors:
+ - **config** object supports the following keys,
+ * database: data base name, optional, "default" as default.
+ * table.name: table name.
+ * where: where conditions string, split by ",", optional.
+ e.g. `dt=20170410 AND hour=15, dt=20170411 AND hour=15, dt=20170412
AND hour=15`
+
+ ##### For JDBC based Data Connectors:
+- **config** object supports the following keys,
+
+| Name | Type | Description | Default
Values |
+|:-----------|:---------|:---------------------------------------|:--------------
|
+| database | `String` | database name | default |
+| tablename | `String` | table name to be read | `Empty` |
+| url | `String` | the connection string URL to database | `Empty` |
+| user | `String` | user for connection to database | `Empty` |
+| password | `String` | password for connection to database | `null` |
+| driver | `String` | driver class for JDBC connection to database |
com.mysql.jdbc.Driver |
+| where | `String` | condition for reading data from table | `Empty` |
+
+- Example:
+ ```
+ "connectors": [
+ {
+ "type": "jdbc",
+ "config": {
+ "database": "default",
+ "tablename": "test",
+ "url": "jdbc:mysql://localhost:3306/default",
+ "user": "test_u",
+ "password": "test_p",
+ "driver": "com.mysql.jdbc.Driver",
+ "where": ""
+ }
+ }
+ ]
+
+**Note:** Jar containing driver class should be present in Spark job's class
path, by either providing custom jar with
+`--jars` parameter to spark-submit or by adding setting `spark.jars` in `spark
-> config` section of environment config.
+
+### Rule
- **dsl.type**: Rule dsl type, "spark-sql", "df-ops" and "griffin-dsl".
- **dq.type**: DQ type of this rule, only for "griffin-dsl" type. Supported
types: "ACCURACY", "PROFILING", "TIMELINESS", "UNIQUENESS", "COMPLETENESS".
- **out.dataframe.name** (step information): Output table name of this rule,
could be used in the following rules.
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index deb9bd8..ccc5fda 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -32,15 +32,14 @@ import
org.apache.griffin.measure.datasource.connector.streaming._
object DataConnectorFactory extends Loggable {
+ @deprecated val AvroRegex: Regex = """^(?i)avro$""".r
+ @deprecated val TextDirRegex: Regex = """^(?i)text-dir$""".r
+
val HiveRegex: Regex = """^(?i)hive$""".r
- val AvroRegex: Regex = """^(?i)avro$""".r
val FileRegex: Regex = """^(?i)file$""".r
- val TextDirRegex: Regex = """^(?i)text-dir$""".r
-
val KafkaRegex: Regex = """^(?i)kafka$""".r
-
- val CustomRegex: Regex = """^(?i)custom$""".r
val JDBCRegex: Regex = """^(?i)jdbc$""".r
+ val CustomRegex: Regex = """^(?i)custom$""".r
/**
* create data connector
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
index dcedf48..d069356 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
@@ -28,6 +28,9 @@ import org.apache.griffin.measure.utils.ParamUtil._
/**
* batch data connector for avro file
*/
+@deprecated(
+ s"This class is deprecated. Use
'${classOf[FileBasedDataConnector].getCanonicalName}'.",
+ "0.6.0")
case class AvroBatchDataConnector(
@transient sparkSession: SparkSession,
dcParam: DataConnectorParam,
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
index 086596b..3fbd73c 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
@@ -42,7 +42,7 @@ import org.apache.griffin.measure.utils.ParamUtil._
* - format : [[String]] specifying the type of file source (parquet, orc,
etc.).
* - paths : [[Seq]] specifying the paths to be read
* - options : [[Map]] of format specific options
- * - skipOnError : [[Boolean]] specifying where to continue execution if one
or more paths are invalid.
+ * - skipOnError : [[Boolean]] specifying whether to continue execution if
one or more paths are invalid.
* - schema : [[Seq]] of {colName, colType and isNullable} given as key value
pairs. If provided, this can
* help skip the schema inference step for some underlying data sources.
*
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
index 31502b2..feacfc9 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
@@ -25,7 +25,7 @@ import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.utils.ParamUtil._
@deprecated(
- "This class is deprecated. Use
'org.apache.griffin.measure.datasource.connector.batch.JDBCBasedDataConnector'",
+ s"This class is deprecated. Use
'${classOf[JDBCBasedDataConnector].getCanonicalName}'.",
"0.6.0")
case class MySqlDataConnector(
@transient sparkSession: SparkSession,
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
index 35bcaa3..946ff7c 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
@@ -28,6 +28,9 @@ import org.apache.griffin.measure.utils.ParamUtil._
/**
* batch data connector for directory with text format data in the nth depth
sub-directories
*/
+@deprecated(
+ s"This class is deprecated. Use
'${classOf[FileBasedDataConnector].getCanonicalName}'.",
+ "0.6.0")
case class TextDirBatchDataConnector(
@transient sparkSession: SparkSession,
dcParam: DataConnectorParam,