This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
     new 138f2107e9 [INLONG-583][Doc] Support dynamic topic and dynamic 
partition for KafkaLoadNode (#587)
138f2107e9 is described below

commit 138f2107e9290729d5e0a8dc984ab70a4610f20c
Author: yunqingmoswu <[email protected]>
AuthorDate: Mon Nov 7 17:29:34 2022 +0800

    [INLONG-583][Doc] Support dynamic topic and dynamic partition for 
KafkaLoadNode (#587)
---
 docs/data_node/load_node/kafka.md                  | 116 +++++++++++++++++++-
 .../current/data_node/load_node/kafka.md           | 118 ++++++++++++++++++++-
 2 files changed, 232 insertions(+), 2 deletions(-)

diff --git a/docs/data_node/load_node/kafka.md 
b/docs/data_node/load_node/kafka.md
index 7a14a88eb5..b683df182e 100644
--- a/docs/data_node/load_node/kafka.md
+++ b/docs/data_node/load_node/kafka.md
@@ -83,6 +83,8 @@ TODO: It will be supported in the future.
 |---------|----------|---------|------|------------|
 | connector | required | (none) | String | Specify which connector to use, 
valid values are:  1. for the Upsert Kafka use: `upsert-kafka-inlong`  2. for 
normal Kafka use: `kafka-inlong` |
 | topic | required | (none) | String | Topic name(s) to read data from when 
the table is used as source. It also supports  topic list for source by 
separating topic by semicolon like `topic-1;topic-2`. Note, only one of 
`topic-pattern` and `topic` can be specified for sources. |
+| topic-pattern | optional | (none) | String | Dynamic topic extraction 
pattern, like '${VARIABLE_NAME}', which is only used in kafka multiple sink 
scenarios and is valid when 'format' is 'raw'. |
+| sink.multiple.format | optional | (none) | String | Format of kafka raw 
data, currently only supports [canal-json&#124;debezium-json] which is only 
used in kafka multiple sink scenarios and is valid when 'format' is 'raw'.  |
 | properties.bootstrap.servers | required | (none) | String | Comma separated 
list of Kafka brokers. |
 | properties.* | optional | (none) | String | This can set and pass arbitrary 
Kafka configurations. Suffix names must match the configuration key defined in 
[Kafka Configuration 
documentation](https://kafka.apache.org/documentation/#configuration). Flink 
will remove the `properties.` key prefix and pass the transformed key and 
values to the underlying KafkaClient. For example, you can disable automatic 
topic creation via `properties.allow.auto.create.topics` = `false`. But there 
are some [...]
 | format | required for normal Kafka | (none) | String | The format used to 
deserialize and serialize the value part of Kafka messages. Please refer to the 
formats page for more details and more 
[format](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/)
 options. Note: Either this option or the `value.format` option are required. |
@@ -91,7 +93,8 @@ TODO: It will be supported in the future.
 | key.fields-prefix | optional | (none) | String | Defines a custom prefix for 
all fields of the key format to avoid name clashes with fields of the value 
format. By default, the prefix is empty. If a custom prefix is defined, both 
the table schema and 'key.fields' will work with prefixed names. When 
constructing the data type of the key format, the prefix will be removed and 
the non-prefixed names will be used within the key format. Please note that 
this option requires that 'value.fiel [...]
 | value.format | required for upsert Kafka | (none) | String | The 
[format](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/)
 used to deserialize and serialize the value part of Kafka messages. Please 
refer to the formats page for more details and more format options. |
 | value.fields-include | optional | ALL | Enum Possible values: [ALL, 
EXCEPT_KEY]| Defines a strategy how to deal with key columns in the data type 
of the value format. By default, 'ALL' physical columns of the table schema 
will be included in the value format which means that key columns appear in the 
data type for both the key and value format |
-| sink.partitioner | optional | 'default' | String | Output partitioning from 
Flink's partitions into Kafka's partitions. Valid values are <br/>`default`: 
use the kafka default partitioner to partition records. <br/>`fixed`: each 
Flink partition ends up in at most one Kafka partition. <br/>`round-robin`: a 
Flink partition is distributed to Kafka partitions sticky round-robin. It only 
works when record's keys are not specified. Custom FlinkKafkaPartitioner 
subclass: e.g. 'org.mycompany.My [...]
+| sink.partitioner | optional | 'default' | String | Output partitioning from 
Flink's partitions into Kafka's partitions. Valid values are <br/>`default`: 
use the kafka default partitioner to partition records. <br/>`fixed`: each 
Flink partition ends up in at most one Kafka partition. <br/>`round-robin`: a 
Flink partition is distributed to Kafka partitions sticky round-robin. 
<br/>raw-hash: Extract value based on 'sink.multiple.partition-pattern' to 
'hash' as the final partition, which i [...]
+| sink.multiple.partition-pattern | optional | (none) | String |  Dynamic 
partition extraction pattern, like '${VARIABLE_NAME}' which is only used in 
kafka multiple sink scenarios and is valid when 'format' is 'raw'. |
 | sink.semantic | optional | at-least-once | String | Defines the delivery 
semantic for the Kafka sink. Valid enumerationns are 'at-least-once', 
'exactly-once' and 'none'. See [Consistency 
guarantees](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#consistency-guarantees)
 for more details. |
 | sink.parallelism | optional | (none) | Integer | Defines the parallelism of 
the Kafka sink operator. By default, the parallelism is determined by the 
framework using the same parallelism of the upstream chained operator. |
 | inlong.metric.labels | optional | (none) | String | Inlong metric label, 
format of value is groupId=xxgroup&streamId=xxstream&nodeId=xxnode. |
@@ -102,6 +105,117 @@ It supports write metadata for format `canal-json-inlong`.
 
 See the [Kafka Extract Node](../extract_node/kafka.md) for a list of all 
available metadata fields.
 
+## Features
+
+### Support Dynamic Schema Writing
+
+Dynamic schema writing supports dynamic extraction of topic and partition from 
data and writes to the corresponding topic
+and partition. In order to support dynamic schema writing, you need to set the 
format of Kafka to 'raw',
+Also need to set the serialization format of the upstream data (via the option 
'sink.multiple.format'
+to set, currently only supports [canal-json|debezium-json]).
+
+#### Dynamic Topic Extraction
+
+Dynamic topic extraction is by parsing the topic pattern and extracting the 
topic from the data.
+In order to support dynamic extraction of topic, you need to set the option 
'topic-pattern', Kafka Load Node will parse 'topic-pattern' as the final topic,
+If parsing fails, it will be written to the default topic set via 'topic'. 
'topic-pattern' supports constants and variables, constants are string 
constants,
+variables are strictly represented by '${VARIABLE_NAME}', and the value of the 
variable comes from the data itself, that is, through 'sink.multiple.format'
+a metadata field of a specified Format, or a physical field in the data.
+
+Examples of 'topic-parttern' are as follows:
+- 'sink.multiple.format' is 'canal-json':
+
+The upstream data is:
+```
+{
+  "data": [
+    {
+      "id": "111",
+      "name": "scooter",
+      "description": "Big 2-wheel scooter",
+      "weight": "5.18"
+    }
+  ],
+  "database": "inventory",
+  "es": 1589373560000,
+  "id": 9,
+  "isDdl": false,
+  "mysqlType": {
+    "id": "INTEGER",
+    "name": "VARCHAR(255)",
+    "description": "VARCHAR(512)",
+    "weight": "FLOAT"
+  },
+  "old": [
+    {
+      "weight": "5.15"
+    }
+  ],
+  "pkNames": [
+    "id"
+  ],
+  "sql": "",
+  "sqlType": {
+    "id": 4,
+    "name": 12,
+    "description": 12,
+    "weight": 7
+  },
+  "table": "products",
+  "ts": 1589373560798,
+  "type": "UPDATE"
+} 
+```
+'topic-pattern' is '{database}_${table}', and the extracted topic is 
'inventory_products' ('database', 'table' are metadata fields, and 'id' are 
physical fields)
+
+'topic-pattern' is '{database}_${table}_${id}', and the extracted topic is 
'inventory_products_4' ('database', 'table' are metadata fields, and 'id' are 
physical fields)
+
+- 'sink.multiple.format' is 'debezium-json':
+
+The upstream data is:
+```
+{
+  "before": {
+    "id": 4,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.18
+  },
+  "after": {
+    "id": 4,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.15
+  },
+  "source": {
+    "db": "inventory",
+    "table": "products"
+  },
+  "op": "u",
+  "ts_ms": 1589362330904,
+  "transaction": null
+}
+```
+'topic-pattern' is '{source.db}_${source.table}', and the extracted topic is 
'inventory_products' ('source.db', 'source.table' are metadata fields, and 'id' 
are physical fields)
+
+'topic-pattern' is '{source.db}_${source.table}_${id}', and the extracted 
topic is 'inventory_products_4' ('source.db', 'source.table' are metadata 
fields, and 'id' are physical fields)
+
+#### Dynamic Partition Extraction
+
+Dynamic partition extraction is to extract Partition from data by parsing 
partition pattern, which is similar to dynamic topic extraction.
+To support dynamic extraction of topics, you need to set the option 
'sink.partitioner' to 'raw-hash'
+and option 'sink.multiple.partition-pattern', Kafka Load Node will parse 
'sink.multiple.partition-pattern'
+as the partition key, hash the partition key and take the remainder of the 
partition size as the final partition,
+If parsing fails, it will return null and execute Kafka's default partitioning 
strategy. 'sink.multiple.partition-pattern'
+support constants, variables and primary keys. Constants are string constants. 
Variables are strictly represented by '${VARIABLE_NAME}', the value of the 
variable comes from the data itself,
+that is, it can be a metadata field of a format specified by 
'sink.multiple.format', or it can be a physical field in the data.
+The primary key is a special constant 'PRIMARY_KEY', which extracts the 
primary key value of the record based on a certain format data format.
+
+Notes: Kafka dynamic partition extraction based on 'PRIMARY_KEY' has a 
limitation that the primary key information needs to be specified in the data,
+For example, if Format is 'canal-json', then its primary key Key is 'pkNames'. 
In addition, because format 'debezium-json' has no definition of primary key, 
here
+we agree that the primary key of 'debezium-json' is also 'pkNames' and is 
included in 'source' like other metadata fields such as 'table' and 'db',
+If partitioning by primary key is used, and the format is 'debezium-json', you 
need to ensure that the real data meets the above conventions.
+
 ## Data Type Mapping
 
 Kafka stores message keys and values as bytes, so Kafka doesn’t have schema or 
data types. The Kafka messages are deserialized and serialized by formats, e.g. 
csv, json, avro. Thus, the data type mapping is determined by specific formats. 
Please refer to 
[Formats](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/)
 pages for more details.
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/kafka.md
 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/kafka.md
index 37ce5f30d3..a0c66fc45a 100644
--- 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/kafka.md
+++ 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/kafka.md
@@ -80,6 +80,8 @@ TODO: 将在未来支持此功能。
 |---------|----------|---------|------|------------|
 | connector | 必选 | (none) | String | 指定要使用的连接器  1. Upsert Kafka 连接器使用: 
`upsert-kafka-inlong`  2. Kafka连接器使用: `kafka-inlong` |
 | topic | 必选 | (none) | String | 当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 
列表,如 `topic-1;topic-2`。注意,对 source 表而言,`topic` 和 `topic-pattern` 两个选项只能使用其中一个。 |
+| topic-pattern | 可选 | (none) | String | 动态 Topic 提取模式, 形如 '${VARIABLE_NAME}', 
仅用于 Kafka 多 Sink 场景且当 'format' 为 'raw' 时有效。 |
+| sink.multiple.format | 可选 | (none) | String | Kafka 原始数据的 Format, 目前仅支持 
[canal-json&#124;debezium-json] 仅用于 Kafka 多 Sink 场景且当 'format' 为 'raw' 时有效。 |
 | properties.bootstrap.servers | 必选 | (none) | String | 逗号分隔的 Kafka broker 列表。 
|
 | properties.* | 可选 | (none) | String | 可以设置和传递任意 Kafka 的配置项。后缀名必须匹配在 [Kafka 
配置文档](https://kafka.apache.org/documentation/#configuration) 中定义的配置键。Flink 将移除 
"properties." 配置键前缀并将变换后的配置键和值传入底层的 Kafka 客户端。例如,你可以通过 
'properties.allow.auto.create.topics' = 'false' 来禁用 topic 
的自动创建。但是某些配置项不支持进行配置,因为 Flink 会覆盖这些配置,例如 'key.deserializer' 和 
'value.deserializer'。 |
 | format | 对于 Kafka 必选 | (none) | String | 用来序列化或反序列化 Kafka 消息的格式。 请参阅 
[格式](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/)
 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和 'value.format' 二者必需其一。 |
@@ -88,17 +90,131 @@ TODO: 将在未来支持此功能。
 | key.fields-prefix | 可选 | (none) | String | 
为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 
'key.fields' 都需要使用带前缀的名称。 当构建消息键格式字段时,前缀会被移除,消息键格式将会使用无前缀的名称。 请注意该配置项要求必须将 
'value.fields-include' 配置为 'EXCEPT_KEY'。 |
 | value.format | 必选 for upsert Kafka | (none) | String | 用于对 Kafka 消息中 value 
部分序列化和反序列化的格式。支持的格式包括 
'csv'、'json'、'avro'。请参考[格式](https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/overview/)
 页面以获取更多详细信息和格式参数。 |
 | value.fields-include | 可选 | ALL | Enum Possible values: [ALL, EXCEPT_KEY]| 
控制哪些字段应该出现在 value 中。可取值:<br/> ALL:消息的 value 部分将包含 schema 
中所有的字段,包括定义为主键的字段。<br/> EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。 |
-| sink.partitioner | 可选 | 'default' | String | Flink partition 到 Kafka 
partition 的分区映射关系,可选值有:<br/>default:使用 Kafka 默认的分区器对消息进行分区。<br/>fixed:每个 Flink 
partition 最终对应最多一个 Kafka partition。<br/>round-robin:Flink partition 
按轮循(round-robin)的模式对应到 Kafka partition。只有当未指定消息的消息键时生效。<br/>自定义 
FlinkKafkaPartitioner 的子类:例如 'org.mycompany.MyPartitioner'。请参阅 [Sink 
分区](https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/kafka/#sink-%E5%88%86%E5%8C%BA)
 以获取更多细节。 |
+| sink.partitioner | 可选 | 'default' | String | Flink partition 到 Kafka 
partition 的分区映射关系,可选值有:<br/>default:使用 Kafka 默认的分区器对消息进行分区。<br/>fixed:每个 Flink 
partition 最终对应最多一个 Kafka partition。<br/>round-robin:Flink partition 
按轮循(round-robin)的模式对应到 Kafka partition。<br/>raw-hash: 基于 
'sink.multiple.partition-pattern' 提取值作 'hash' 以确定最终的分区, 仅用于 Kafka 多 Sink 场景且当 
'format' 为 'raw' 时有效。只有当未指定消息的消息键时生效。<br/>自定义 FlinkKafkaPartitioner 的子类:例如 
'org.mycompany.MyPartitioner'。请参阅 [Sink 分区](https://nightlies.ap [...]
+| sink.multiple.partition-pattern | 可选 | (none) | String |  动态 Partition 提取模式, 
形如 '${VARIABLE_NAME}'仅用于 Kafka 多 Sink 场景且当 'format' 为 'raw'、'sink.partitioner' 
为 'raw-hash' 时有效。 |
 | sink.semantic | 可选 | at-least-once | String | 定义 Kafka sink 的语义。有效值为 
'at-least-once','exactly-once' 和 'none'。请参阅 
[一致性保证](https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/kafka/#%E4%B8%80%E8%87%B4%E6%80%A7%E4%BF%9D%E8%AF%81)
 以获取更多细节。 |
 | sink.parallelism | 可选 | (none) | Integer | 定义 Kafka sink 
算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。 |
 | inlong.metric | 可选 | (none) | String | inlong metric 
的标签值,该值的构成为groupId&streamId&nodeId。|
 
+
 ## 可用的元数据字段
 
 支持为格式 `canal-json-inlong`写元数据。
 
 参考 [Kafka Extract Node](../extract_node/kafka.md) 关于元数据的列表。
 
+## 特征
+
+### 支持动态 Schema 写入
+
+动态 Schema 写入支持从数据中动态提取 Topic 和 Partition, 并写入到对应的 Topic
+和 Partition。为了支持动态 Schema 写入,需要设置 Kafka 的 Format 格式为 'raw', 
+同时需要设置上游数据的序列化格式(通过选项 'sink.multiple.format' 
+来设置, 目前仅支持 [canal-json|debezium-json])。
+
+#### 动态 Topic 提取
+
+动态 Topic 提取即通过解析 Topic Pattern 并从数据中提取 Topic 。
+为了支持动态提取 Topic, 需要设置选项 'topic-pattern', Kafka Load Node 会解析 'topic-pattern' 
作为最终的 Topic, 
+如果解析失败, 会写入通过 'topic' 设置的默认 Topic 中。'topic-pattern' 支持常量和变量,常量就是字符串常量, 
+变量是严格通过 '${VARIABLE_NAME}' 来表示, 变量的取值来自于数据本身, 即可以是通过 'sink.multiple.format' 
+指定的某种 Format 的元数据字段, 也可以是数据中的物理字段。
+
+关于 'topic-parttern' 的例子如下:
+- 'sink.multiple.format' 为 'canal-json':
+
+上游数据为:
+```
+{
+  "data": [
+    {
+      "id": "111",
+      "name": "scooter",
+      "description": "Big 2-wheel scooter",
+      "weight": "5.18"
+    }
+  ],
+  "database": "inventory",
+  "es": 1589373560000,
+  "id": 9,
+  "isDdl": false,
+  "mysqlType": {
+    "id": "INTEGER",
+    "name": "VARCHAR(255)",
+    "description": "VARCHAR(512)",
+    "weight": "FLOAT"
+  },
+  "old": [
+    {
+      "weight": "5.15"
+    }
+  ],
+  "pkNames": [
+    "id"
+  ],
+  "sql": "",
+  "sqlType": {
+    "id": 4,
+    "name": 12,
+    "description": 12,
+    "weight": 7
+  },
+  "table": "products",
+  "ts": 1589373560798,
+  "type": "UPDATE"
+} 
+```
+'topic-pattern' 为 '{database}_${table}', 提取后的 Topic 为 'inventory_products' 
('database', 'table' 为元数据字段, 'id' 为物理字段)
+
+'topic-pattern' 为 '{database}_${table}_${id}', 提取后的 Topic 为 
'inventory_products_4' ('database', 'table' 为元数据字段, 'id' 为物理字段)
+
+- 'sink.multiple.format' 为 'debezium-json':
+
+上游数据为:
+```
+{
+  "before": {
+    "id": 4,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.18
+  },
+  "after": {
+    "id": 4,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.15
+  },
+  "source": {
+    "db": "inventory",
+    "table": "products"
+  },
+  "op": "u",
+  "ts_ms": 1589362330904,
+  "transaction": null
+}
+```
+'topic-pattern' 为 '{database}_${table}', 提取后的 Topic 为 'inventory_products' 
('source.db', 'source.table' 为元数据字段, 'id' 为物理字段)
+
+'topic-pattern' 为 '{database}_${table}_${id}', 提取后的 Topic 为 
'inventory_products_4' ('source.db', 'source.table' 为元数据字段, 'id' 为物理字段)
+
+#### 动态 Partition 提取
+
+动态 Partition 提取即通过解析 Partition Pattern 并从数据中提取 Partition, 这和动态 Topic 提取类似。
+为了支持动态提取 Topic, 需要设置选项 'sink.partitioner' 为 'raw-hash' 
+和选项 'sink.multiple.partition-pattern', Kafka Load Node 会解析 
'sink.multiple.partition-pattern' 
+作为 Partition key, 并对 Partition key 进行 Hash 和对 Partition Size 取余以确定最终 Partition,
+如果解析失败, 会返回 null 并执行 Kafka 默认的分区策略。'sink.multiple.partition-pattern' 
+支持常量、变量和主键,常量就是字符串常量, 变量是严格通过 ${VARIABLE_NAME} 来表示, 变量的取值来自于数据本身,
+即可以是通过 'sink.multiple.format' 指定的某种 Format 的元数据字段, 也可以是数据中的物理字段,
+主键是一种特殊的常量 'PRIMARY_KEY', 基于某种 Format 的数据格式下来提取该条记录的主键值。
+
+注意: 基于 'PRIMARY_KEY' 的 Kafka 动态 Partition 提取, 有一个限制, 即需要在数据中指定主键信息, 
+例如 Format 为 'canal-json', 则其主键 Key 为 'pkNames'。另外由于 Format 'debezium-json' 
无主键的定义, 对此
+我们约定 'debezium-json' 的主键 Key 也为 'pkNames' 且和其他元数据字段如 'table'、'db' 一样包含在 
'source'中,
+如果用到了按主键分区, 且 Format 为 'debezium-json', 需要确保真实数据满足上述约定。
+
+
 ## 数据类型映射
 
 Kafka 将消息键值以二进制进行存储,因此 Kafka 并不存在 schema 或数据类型。Kafka 消息使用格式配置进行序列化和反序列化,例如 
csv,json,avro。 因此,数据类型映射取决于使用的格式。请参阅 
[格式](https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/overview/)
 页面以获取更多细节。

Reply via email to