yunqingmoswu commented on code in PR #392:
URL:
https://github.com/apache/incubator-inlong-website/pull/392#discussion_r895282128
##########
docs/data_node/extract_node/kafka.md:
##########
@@ -3,6 +3,157 @@ title: Kafka
sidebar_position: 4
---
-## Configuration
-the Dashboard has not supported extracting data from Kafka for this version,
-you can create Kafka data streams from the background via the [Command-line
Tools](user_guide/command_line_tools.md).
\ No newline at end of file
+## Kafka Extract Node
+
+The `Kafka Extract Node` supports to read data from Kafka topics. It can
support read data in the normal fashion and read data in the
+upsert fashion. The `upsert-kafka` connector produces a `changelog stream`,
where each data record represents an `update` or
+`delete` event. The `kafka-inlong` connector can read data and metadata.
+
+## Supported Version
+
+| Extract Node | Kafka version |
+|-----------------------------|---------------|
+| [Kafka](./kafka.md) | universal |
+
+## Dependencies
+
+In order to set up the `Kafka Extract Node`, the following provides dependency
information for both projects using a
+build automation tool (such as Maven or SBT) and SQL Client with Sort
Connectors JAR bundles.
+
+### Maven dependency
+
+```xml
+<dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-kafka</artifactId>
+ <!-- Choose the version that suits your application -->
+ <version>inlong_version</version>
+</dependency>
+```
+
+## How to create a Kafka Extract Node
+
+### Usage for SQL API
+
+The example below shows how to create a Kafka Extract Node with `Flink SQL` :
+* connector is `kafka-inlong`
+```sql
+-- Set checkpoint every 3000 milliseconds
+Flink SQL> SET 'execution.checkpointing.interval' = '3s';
+
+-- Create a Kafka table 'kafka_extract_node' in Flink SQL
+Flink SQL> CREATE TABLE kafka_extract_node (
+ `id` INT,
+ `name` STRINTG
+ ) WITH (
+ 'connector' = 'kafka-inlong',
+ 'topic' = 'user',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'properties.group.id' = 'testGroup',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'format' = 'csv'
+ )
+
+-- Read data
+Flink SQL> SELECT * FROM kafka_extract_node;
+```
+* connector is `upsert-kafka`
+```sql
+-- Set checkpoint every 3000 milliseconds
+Flink SQL> SET 'execution.checkpointing.interval' = '3s';
+
+-- Create a Kafka table 'kafka_extract_node' in Flink SQL
+Flink SQL> CREATE TABLE kafka_extract_node (
+ `id` INT,
+ `name` STRINTG,
+ PRIMARY KEY (`id`) NOT ENFORCED
+ ) WITH (
+ 'connector' = 'upsert-kafka',
+ 'topic' = 'user',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'properties.group.id' = 'testGroup',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'key.format' = 'csv',
+ 'value.format' = 'csv'
+ )
+
+-- Read data
+Flink SQL> SELECT * FROM kafka_extract_node;
+```
+### Usage for InLong Dashboard
+
+TODO: It will be supported in the future.
+
+### Usage for InLong Manager Client
+
+TODO: It will be supported in the future.
+
+## Kafka Extract Node Options
+
+| Option | Required | Default | Type | Description |
+|---------|----------|---------|------|------------|
+| connector | required | (none) | String | Specify which connector to use,
valid values are: 1. for the Upsert Kafka use: `upsert-kafka'` 2. for normal
Kafka use: `kafka-inlong` |
+| topic | optional | (none) | String | Topic name(s) to read data from when
the table is used as source. It also supports topic list for source by
separating topic by semicolon like `topic-1;topic-2`. Note, only one of
`topic-pattern` and `topic` can be specified for sources. |
+| topic-pattern | optional | (none) | String | The regular expression for a
pattern of topic names to read from. All topics with names that match the
specified regular expression will be subscribed by the consumer when the job
starts running. Note, only one of `topic-pattern` and `topic` can be specified
for sources. |
+| properties.bootstrap.servers | required | (none) | String | Comma separated
list of Kafka brokers. |
+| properties.group.id | required | (none) | String | The id of the consumer
group for Kafka source. |
+| properties.* | optional | (none) | String | This can set and pass arbitrary
Kafka configurations. Suffix names must match the configuration key defined in
Kafka Configuration documentation. Flink will remove the `properties.` key
prefix and pass the transformed key and values to the underlying KafkaClient.
For example, you can disable automatic topic creation via
`properties.allow.auto.create.topics` = `false`. But there are some
configurations that do not support to set, because Flink will override them,
e.g. `key.deserializer` and `value.deserializer`. |
+| format | required for normal kafka | (none) | String | The format used to
deserialize and serialize the value part of Kafka messages. Please refer to the
formats page for more details and more
[format](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/)
options. Note: Either this option or the `value.format` option are required. |
+| key.format | optional | (none) | String | The format used to deserialize and
serialize the key part of Kafka messages. Please refer to the formats page for
more details and more format options. Note: If a key format is defined, the
'key.fields' option is required as well. Otherwise the Kafka records will have
an empty key. |
+| key.fields | optional | [] | `List<String>` | Defines an explicit list of
physical columns from the table schema that configure the data type for the key
format. By default, this list is empty and thus a key is undefined. The list
should look like 'field1;field2'. |
+| key.fields-prefix | optional | (none) | String | Defines a custom prefix for
all fields of the key format to avoid name clashes with fields of the value
format. By default, the prefix is empty. If a custom prefix is defined, both
the table schema and 'key.fields' will work with prefixed names. When
constructing the data type of the key format, the prefix will be removed and
the non-prefixed names will be used within the key format. Please note that
this option requires that 'value.fields-include' must be set to 'EXCEPT_KEY'. |
+| value.fields-include | optional | ALL | Enum Possible values: [ALL,
EXCEPT_KEY]| Defines a strategy how to deal with key columns in the data type
of the value format. By default, 'ALL' physical columns of the table schema
will be included in the value format which means that key columns appear in the
data type for both the key and value format |
+| scan.startup.mode | optional | group-offsets | String | Startup mode for
Kafka consumer, valid values are 'earliest-offset', 'latest-offset',
'group-offsets', 'timestamp' and 'specific-offsets'. See the following Start
Reading Position for more details. |
+| scan.startup.specific-offsets | optional | (none) | String | Specify offsets
for each partition in case of 'specific-offsets' startup mode, e.g.
'partition:0,offset:42;partition:1,offset:300'. |
+| scan.startup.timestamp-millis | optional | (none) | Long | Start from the
specified epoch timestamp (milliseconds) used in case of 'timestamp' startup
mode. |
+| scan.topic-partition-discovery.interval | optional | (none) | Duration |
Interval for consumer to discover dynamically created Kafka topics and
partitions periodically. |
+
+## Available Metadata
+
+The following format metadata can be exposed as read-only (VIRTUAL) columns in
a table definition. It supports read metadata for format `canal-json-inlong`.
+
+| key | Data Type | Description |
+|-----|------------|-------------|
+| value.table_name | STRING | Name of the table that contain the row |
+| value.database_name | STRING | Name of the database that contain the row |
+| value.op_ts| TIMESTAMP(3) | It indicates the time that the change was made
in the database. If the record is read from snapshot of the table instead of
the binlog, the value is always 0 |
+| value.op_type| STRING | Operation type, INSERT/UPDATE/DELETE |
+| value.batch_id| BIGINT | Not important, a simple increment counter |
+| value.is_ddl| BOOLEAN | Source does not emit ddl data, value is false |
+| value.update_before| `ARRAY<MAP<STRING, STRING>>` | The update-before data
for UPDATE record |
+| value.mysql_type | MAP<STRING, STRING> | MySQL field type |
+| value.pk_names | `ARRAY<STRING>` | Primary key |
+| value.sql_type | MAP<STRING, INT> | SQL field type |
+| value.ts | TIMESTAMP_LTZ(3) | The ts_ms field is used to store the
information about the local time at which the connector processed/generated the
event |
+
+The extended CREATE TABLE example demonstrates the syntax for exposing these
metadata fields:
+
+```sql
+CREATE TABLE `kafka_extract_node` (
+ `id` INT,
+ `name` STRING,
+ `database_name` string METADATA FROM 'value.database_name',
+ `table_name` string METADATA FROM 'value.table_name',
+ `op_ts` timestamp(3) METADATA FROM 'value.op_ts',
+ `op_type` string METADATA FROM 'value.op_type',
+ `batch_id` bigint METADATA FROM 'value.batch_id',
+ `is_ddl` boolean METADATA FROM 'value.is_ddl',
+ `update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM
'value.update_before',
+ `mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql_type',
+ `pk_names` ARRAY<STRING> METADATA FROM 'value.pk_names',
+ `data` STRING METADATA FROM 'value.data',
+ `sql_type` MAP<STRING, INT> METADATA FROM 'value.sql_type',
+ `ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ts',
+) WITH (
+ 'connector' = 'kafka-inlong',
+ 'topic' = 'user',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'properties.group.id' = 'testGroup',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'format' = 'canal-json-inlong'
+)
+```
+
+## Data Type Mapping
+
+Kafka stores message keys and values as bytes, so Kafka doesn’t have schema or
data types. The Kafka messages are deserialized and serialized by formats, e.g.
csv, json, avro. Thus, the data type mapping is determined by specific formats.
Please refer to
[Formats](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/)
pages for more details.
Review Comment:
Is it better to copy a copy from Flink here?
##########
docs/data_node/extract_node/mysql-cdc.md:
##########
@@ -14,7 +14,7 @@ The MySQL Extract Node allows for reading snapshot data and
incremental data fro
## Dependencies
-In order to setup the MySQL Extract Node, the following table provides
dependency information for both projects using a build automation tool (such as
Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
Review Comment:
Why it is changed to `set up`?
##########
docs/data_node/extract_node/kafka.md:
##########
@@ -3,6 +3,157 @@ title: Kafka
sidebar_position: 4
---
-## Configuration
-the Dashboard has not supported extracting data from Kafka for this version,
-you can create Kafka data streams from the background via the [Command-line
Tools](user_guide/command_line_tools.md).
\ No newline at end of file
+## Kafka Extract Node
+
+The `Kafka Extract Node` supports to read data from Kafka topics. It can
support read data in the normal fashion and read data in the
+upsert fashion. The `upsert-kafka` connector produces a `changelog stream`,
where each data record represents an `update` or
+`delete` event. The `kafka-inlong` connector can read data and metadata.
+
+## Supported Version
+
+| Extract Node | Kafka version |
+|-----------------------------|---------------|
+| [Kafka](./kafka.md) | universal |
Review Comment:
The kafka version is best to be consistent with the overview.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]