yunqingmoswu commented on code in PR #381: URL: https://github.com/apache/incubator-inlong-website/pull/381#discussion_r879425023
##########
docs/modules/sort/dataflow_example.md:
##########
@@ -5,307 +5,1717 @@ sidebar_position: 3
# Examples
-To make it easier for you to create inlong-sort jobs, here we list some
dataflow configuration examples.
+To make it easier for you to create InLong-Sort jobs, here we list some
dataflow configuration examples.
-## Pulsar to Kafka
+## MySQL to Kafka
-Normal example:
+- Single table or sub-database sub-table sync example:
```json
{
- "id": 1,
- "source_info": {
- "type": "pulsar",
- "admin_url": "YOUR_PULSAR_ADMIN_URL",
- "service_url": "YOUR_PULSAR_SERVICE_URL",
- "topic": "YOUR_PULSAR_TOPIC",
- "subscription_name": "debezium2canal",
- "deserialization_info": {
- "type": "debezium_json",
- "ignore_parse_errors": true,
- "timestamp_format_standard": "ISO_8601"
- },
- "fields": [
- {
- "type": "base",
- "name": "name",
- "format_info": {
- "type": "string"
+ "groupId":"1",
+ "streams":[
+ {
+ "streamId":"1",
+ "nodes":[
+ {
+ "type":"mysqlExtract",
+ "id":"1",
+ "name":"mysql_input",
+ "fields":[
+ {
+ "type":"base",
+ "name":"id",
+ "formatInfo":{
+ "type":"long"
+ }
+ },
+ {
+ "type":"base",
+ "name":"name",
+ "formatInfo":{
+ "type":"string"
+ }
+ },
+ {
+ "type":"base",
+ "name":"age",
+ "formatInfo":{
+ "type":"int"
+ }
+ }
+ ],
+ "properties":{
+ "append-mode":"true"
+ },
+ "primaryKey":"id",
+ "tableNames":[
+ "YOUR_TABLE"
+ ],
+ "hostname":"YOUR_MYSQL_HOST",
+ "username":"YOUR_USERNAME",
+ "password":"YOUR_PASSWORD",
+ "database":"YOUR_DATABASE",
+ "port":3306,
+ "incrementalSnapshotEnabled":true
+ },
+ {
+ "type":"kafkaLoad",
+ "id":"2",
+ "name":"kafka_output",
+ "fields":[
+ {
+ "type":"base",
+ "name":"id",
+ "formatInfo":{
+ "type":"long"
+ }
+ },
+ {
+ "type":"base",
+ "name":"name",
+ "formatInfo":{
+ "type":"string"
+ }
+ },
+ {
+ "type":"base",
+ "name":"age",
+ "formatInfo":{
+ "type":"int"
+ }
+ }
+ ],
+ "fieldRelationShips":[
+ {
+ "type":"fieldRelationShip",
+ "inputField":{
+ "type":"base",
+ "name":"id",
+ "formatInfo":{
+ "type":"long"
+ }
+ },
+ "outputField":{
+ "type":"base",
+ "name":"id",
+ "formatInfo":{
+ "type":"long"
+ }
+ }
+ },
+ {
+ "type":"fieldRelationShip",
+ "inputField":{
+ "type":"base",
+ "name":"name",
+ "formatInfo":{
+ "type":"string"
+ }
+ },
+ "outputField":{
+ "type":"base",
+ "name":"name",
+ "formatInfo":{
+ "type":"string"
+ }
+ }
+ },
+ {
+ "type":"fieldRelationShip",
+ "inputField":{
+ "type":"base",
+ "name":"age",
+ "formatInfo":{
+ "type":"int"
+ }
+ },
+ "outputField":{
+ "type":"base",
+ "name":"age",
+ "formatInfo":{
+ "type":"int"
+ }
+ }
+ }
+ ],
+ "topic":"YOUR_TOPIC",
+ "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
+ "format":{
+ "type":"jsonFormat",
+ "failOnMissingField":false,
+ "ignoreParseErrors":true,
+ "timestampFormatStandard":"SQL",
+ "mapNullKeyMode":"DROP",
+ "mapNullKeyLiteral":"null",
+ "encodeDecimalAsPlainNumber":true
+ },
+ "primaryKey":"id"
}
- },
- {
- "type": "base",
- "name": "age",
- "format_info": {
- "type": "int"
+ ],
+ "relations":[
+ {
+ "type":"baseRelation",
+ "inputs":[
+ "1"
+ ],
+ "outputs":[
+ "2"
+ ]
}
+ ]
+ }
+ ]
+}
+```
+
+- Whole-database migration example:
+
+```json
+{
+ "groupId":"1",
+ "streams":[
+ {
+ "streamId":"1",
+ "nodes":[
+ {
+ "type":"mysqlExtract",
+ "id":"1",
+ "name":"mysql_input",
+ "fields":[
+ {
+ "type":"builtin",
+ "name":"data",
+ "formatInfo":{
+ "type":"string"
+ },
+ "builtinField":"MYSQL_METADATA_DATA"
}
- ],
- "authentication": null
- },
- "sink_info": {
- "type": "kafka",
- "fields": [
+ ],
+ "properties":{
+ "append-mode":"true",
+ "migrate-all":"true"
+ },
+ "tableNames":[
+ "[\\s\\S]*.*"
+ ],
+ "hostname":"YOUR_MYSQL_HOST",
+ "username":"YOUR_USERNAME",
+ "password":"YOUR_PASSWORD",
+ "database":"[\\s\\S]*.*",
+ "port":3306,
+ "incrementalSnapshotEnabled":false
+ },
+ {
+ "type":"kafkaLoad",
+ "id":"2",
+ "name":"kafka_output",
+ "fields":[
{
- "type": "base",
- "name": "name",
- "format_info": {
- "type": "string"
- }
- },
+ "type":"base",
+ "name":"data",
+ "formatInfo":{
+ "type":"string"
+ }
+ }
+ ],
+ "fieldRelationShips":[
{
- "type": "base",
- "name": "age",
- "format_info": {
- "type": "int"
+ "type":"fieldRelationShip",
+ "inputField":{
+ "type":"base",
+ "name":"data",
+ "formatInfo":{
+ "type":"string"
+ }
+ },
+ "outputField":{
+ "type":"base",
+ "name":"data",
+ "formatInfo":{
+ "type":"string"
}
+ }
}
- ],
- "address": "YOUR_KAFKA_ADDRESS",
- "topic": "sort_test_canal",
- "serialization_info": {
- "type": "canal"
+ ],
+ "topic":"YOUR_TOPIC",
+ "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
+ "format":{
+ "type":"csvFormat",
+ "fieldDelimiter":",",
+ "disableQuoteCharacter":true,
+ "quoteCharacter":null,
+ "allowComments":false,
+ "ignoreParseErrors":true,
+ "arrayElementDelimiter":";",
+ "escapeCharacter":null,
+ "nullLiteral":null
+ }
}
- },
- "properties": {
- "consumer.bootstrap-mode": "earliest",
- "transaction.timeout.ms": 900000
+ ],
+ "relations":[
+ {
+ "type":"baseRelation",
+ "inputs":[
+ "1"
+ ],
+ "outputs":[
+ "2"
+ ]
+ }
+ ]
}
+ ]
}
```
-Whole-database migration example:
+## Kafka to Hive
```json
{
- "id": 123,
- "source_info": {
- "type": "pulsar",
- "admin_url": "YOUR_PULSAR_ADMIN_URL",
- "service_url": "YOUR_PULSAR_SERVICE_URL",
- "topic": "YOUR_PULSAR_TOPIC",
- "subscription_name": "whole-db-migration",
- "deserialization_info": {
- "type": "debezium_json",
- "ignore_parse_errors": false,
- "timestamp_format_standard": "ISO_8601",
- "include_update_before": true
- },
- "fields": [
- {
- "type": "builtin",
- "name": "db",
- "format_info": {
- "type": "string"
- },
- "builtin_field": "MYSQL_METADATA_DATABASE"
- },
- {
- "type": "builtin",
- "name": "table",
- "format_info": {
- "type": "string"
- },
- "builtin_field": "MYSQL_METADATA_TABLE"
- },
- {
- "type": "builtin",
- "name": "mydata",
- "format_info": {
- "type": "string"
- },
- "builtin_field": "MYSQL_METADATA_DATA"
- },
- {
- "type": "builtin",
- "name": "es",
- "format_info": {
- "type": "long"
- },
- "builtin_field": "MYSQL_METADATA_EVENT_TIME"
- },
- {
- "type": "builtin",
- "name": "isDdl",
- "format_info": {
- "type": "boolean"
+ "groupId":"1",
+ "streams":[
+ {
+ "streamId":"1",
+ "nodes":[
+ {
+ "type":"kafkaExtract",
+ "id":"1",
+ "name":"kafka_input",
+ "fields":[
+ {
+ "type":"base",
+ "name":"id",
+ "formatInfo":{
+ "type":"long"
+ }
+ },
+ {
+ "type":"base",
+ "name":"name",
+ "formatInfo":{
+ "type":"string"
+ }
+ },
+ {
+ "type":"base",
+ "name":"age",
+ "formatInfo":{
+ "type":"int"
+ }
+ }
+ ],
+ "topic":"YOUR_TOPIC",
+ "bootstrapServers":"YOUR_KAFKA_BOOTSTRAP_SERVERS",
+ "format":{
+ "type":"jsonFormat",
+ "failOnMissingField":false,
+ "ignoreParseErrors":true,
+ "timestampFormatStandard":"SQL",
+ "mapNullKeyMode":"DROP",
+ "mapNullKeyLiteral":"null",
+ "encodeDecimalAsPlainNumber":true
+ },
+ "scanStartupMode":"EARLIEST_OFFSET"
},
- "builtin_field": "MYSQL_METADATA_IS_DDL"
- },
- {
- "type": "builtin",
- "name": "type",
- "format_info": {
- "type": "string"
+ {
+ "type":"hiveLoad",
+ "id":"2",
+ "name":"hive_output",
+ "fields":[
+ {
+ "type":"base",
+ "name":"id",
+ "formatInfo":{
+ "type":"long"
+ }
+ },
+ {
+ "type":"base",
+ "name":"name",
+ "formatInfo":{
+ "type":"string"
+ }
+ },
+ {
+ "type":"base",
+ "name":"age",
+ "formatInfo":{
+ "type":"int"
+ }
+ }
+ ],
+ "fieldRelationShips":[
+ {
+ "type":"fieldRelationShip",
+ "inputField":{
+ "type":"base",
+ "name":"id",
+ "formatInfo":{
+ "type":"long"
+ }
+ },
+ "outputField":{
+ "type":"base",
+ "name":"id",
+ "formatInfo":{
+ "type":"long"
+ }
+ }
+ },
+ {
+ "type":"fieldRelationShip",
+ "inputField":{
+ "type":"base",
+ "name":"name",
+ "formatInfo":{
+ "type":"string"
+ }
+ },
+ "outputField":{
+ "type":"base",
+ "name":"name",
+ "formatInfo":{
+ "type":"string"
+ }
+ }
+ },
+ {
+ "type":"fieldRelationShip",
+ "inputField":{
+ "type":"base",
+ "name":"age",
+ "formatInfo":{
+ "type":"int"
+ }
+ },
+ "outputField":{
+ "type":"base",
+ "name":"age",
+ "formatInfo":{
+ "type":"int"
+ }
+ }
+ }
+ ],
+ "sinkParallelism":1,
+ "catalogName":"hivecatlog",
+ "database":"YOUR_DATABASE",
+ "tableName":"YOUR_TABLE_NAME",
+ "hiveConfDir":"YOUR_HIVE_CONF_DIR",
+ "hiveVersion":"3.1.2",
+ "hadoopConfDir":"YOUR_HADOOP_CONF_DIR"
+ }
+ ],
+ "relations":[
+ {
+ "type":"baseRelation",
+ "inputs":[
+ "1"
+ ],
+ "outputs":[
+ "2"
+ ]
+ }
+ ]
+ }
+ ]
+}
+```
+
+## Transform examples
+
+Currently only supports string split, string regex replace, string regex
replace first matched value, data distinct,data filter, regular join, and etc.
Review Comment:
It is a good idea
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
