This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new e692e7b4b [FLINK-36052][docs][cdc-pipeline-connector/es] Add flink cdc pipeline elasticsearch connector docs e692e7b4b is described below commit e692e7b4bca0f71976748b23d716212e0ecce3a0 Author: Junbo wang <beryllw...@gmail.com> AuthorDate: Mon Nov 4 20:13:21 2024 +0800 [FLINK-36052][docs][cdc-pipeline-connector/es] Add flink cdc pipeline elasticsearch connector docs This closes #3649. Co-authored-by: wangjunbo <wangju...@qiyi.com> --- .../docs/connectors/flink-sources/vitess-cdc.md | 2 +- .../pipeline-connectors/elasticsearch.md | 275 +++++++++++++++++++++ .../docs/connectors/flink-sources/vitess-cdc.md | 2 +- .../pipeline-connectors/elasticsearch.md | 275 +++++++++++++++++++++ .../pom.xml | 2 +- 5 files changed, 553 insertions(+), 3 deletions(-) diff --git a/docs/content.zh/docs/connectors/flink-sources/vitess-cdc.md b/docs/content.zh/docs/connectors/flink-sources/vitess-cdc.md index cdc975de1..0ac1f9b72 100644 --- a/docs/content.zh/docs/connectors/flink-sources/vitess-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/vitess-cdc.md @@ -49,7 +49,7 @@ more released versions will be available in the Maven central warehouse. Setup Vitess server ---------------- -You can follow the Local Install via [Docker guide](https://vitess.io/docs/get-started/local-docker/), or the Vitess Operator for [Kubernetes guide](https://vitess.io/docs/get-started/operator/) to install Vitess. No special setup is needed to support Vitess connector. +You can follow the Local Install via [Docker guide](https://vitess.io/docs/get-started/vttestserver-docker-image/), or the Vitess Operator for [Kubernetes guide](https://vitess.io/docs/get-started/operator/) to install Vitess. No special setup is needed to support Vitess connector. ### Checklist * Make sure that the VTGate host and its gRPC port (default is 15991) is accessible from the machine where the Vitess connector is installed diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md b/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md new file mode 100644 index 000000000..939cfc6d8 --- /dev/null +++ b/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md @@ -0,0 +1,275 @@ +--- +title: "Elasticsearch" +weight: 7 +type: docs +aliases: +- /connectors/pipeline-connectors/elasticsearch +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Elasticsearch Pipeline Connector + +Elasticsearch Pipeline 连接器可以用作 Pipeline 的 Data Sink, 将数据写入 Elasticsearch。 本文档介绍如何设置 Elasticsearch Pipeline 连接器。 + + +How to create Pipeline +---------------- + +从 MySQL 读取数据同步到 Elasticsearch 的 Pipeline 可以定义如下: + +```yaml +source: + type: mysql + name: MySQL Source + hostname: 127.0.0.1 + port: 3306 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5401-5404 + +sink: + type: elasticsearch + name: Elasticsearch Sink + hosts: http://127.0.0.1:9092,http://127.0.0.1:9093 + +route: + - source-table: adb.\.* + sink-table: default_index + description: sync adb.\.* table to default_index + +pipeline: + name: MySQL to Elasticsearch Pipeline + parallelism: 2 +``` + +Pipeline Connector Options +---------------- +<div class="highlight"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-left" style="width: 8%">Required</th> + <th class="text-left" style="width: 7%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 50%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>type</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>指定要使用的连接器, 这里需要设置成 <code>'elasticsearch'</code>.</td> + </tr> + <tr> + <td>name</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Sink 的名称。</td> + </tr> + <tr> + <td>hosts</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>要连接到的一台或多台 Elasticsearch 主机,例如: 'http://host_name:9092,http://host_name:9093'.</td> + </tr> + <tr> + <td>version</td> + <td>optional</td> + <td style="word-wrap: break-word;">7</td> + <td>Integer</td> + <td>指定要使用的连接器,有效值为: + <ul> + <li>6: 连接到 Elasticsearch 6.x 的集群。</li> + <li>7: 连接到 Elasticsearch 7.x 的集群。</li> + <li>8: 连接到 Elasticsearch 8.x 的集群。</li> + </ul> + </td> + </tr> + <tr> + <td>username</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>用于连接 Elasticsearch 实例认证的用户名。</td> + </tr> + <tr> + <td>password</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>用于连接 Elasticsearch 实例认证的密码。</td> + </tr> + <tr> + <td>batch.size.max</td> + <td>optional</td> + <td style="word-wrap: break-word;">500</td> + <td>Integer</td> + <td>每个批量请求的最大缓冲操作数。 可以设置为'0'来禁用它。</td> + </tr> + <tr> + <td>inflight.requests.max</td> + <td>optional</td> + <td style="word-wrap: break-word;">5</td> + <td>Integer</td> + <td>连接器将尝试执行的最大并发请求数。</td> + </tr> + <tr> + <td>buffered.requests.max</td> + <td>optional</td> + <td style="word-wrap: break-word;">1000</td> + <td>Integer</td> + <td>每个批量请求的内存缓冲区中保留的最大请求数。</td> + </tr> + <tr> + <td>batch.size.max.bytes</td> + <td>optional</td> + <td style="word-wrap: break-word;">5242880</td> + <td>Long</td> + <td>每个批量请求的缓冲操作在内存中的最大值(以byte为单位)。</td> + </tr> + <tr> + <td>buffer.time.max.ms</td> + <td>optional</td> + <td style="word-wrap: break-word;">5000</td> + <td>Long</td> + <td>每个批量请求的缓冲 flush 操作的间隔(以ms为单位)。</td> + </tr> + <tr> + <td>record.size.max.bytes</td> + <td>optional</td> + <td style="word-wrap: break-word;">10485760</td> + <td>Long</td> + <td>单个记录的最大大小(以byte为单位)。</td> + </tr> + </tbody> +</table> +</div> + +Usage Notes +-------- + +* 写入 Elasticsearch 的 index 默认为与上游表同名字符串,可以通过 pipeline 的 route 功能进行修改。 + +* 如果写入 Elasticsearch 的 index 不存在,不会被默认创建。 + +Data Type Mapping +---------------- +Elasticsearch 将文档存储在 JSON 字符串中,数据类型之间的映射关系如下表所示: +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">CDC type</th> + <th class="text-left">JSON type</th> + <th class="text-left" style="width:60%;">NOTE</th> + </tr> + </thead> + <tbody> + <tr> + <td>TINYINT</td> + <td>NUMBER</td> + <td></td> + </tr> + <tr> + <td>SMALLINT</td> + <td>NUMBER</td> + <td></td> + </tr> + <tr> + <td>INT</td> + <td>NUMBER</td> + <td></td> + </tr> + <tr> + <td>BIGINT</td> + <td>NUMBER</td> + <td></td> + </tr> + <tr> + <td>FLOAT</td> + <td>NUMBER</td> + <td></td> + </tr> + <tr> + <td>DOUBLE</td> + <td>NUMBER</td> + <td></td> + </tr> + <tr> + <td>DECIMAL(p, s)</td> + <td>STRING</td> + <td></td> + </tr> + <tr> + <td>BOOLEAN</td> + <td>BOOLEAN</td> + <td></td> + </tr> + <tr> + <td>DATE</td> + <td>STRING</td> + <td>with format: date (yyyy-MM-dd), example: 2024-10-21</td> + </tr> + <tr> + <td>TIMESTAMP</td> + <td>STRING</td> + <td>with format: date-time (yyyy-MM-dd HH:mm:ss.SSSSSS, with UTC time zone), example: 2024-10-21 14:10:56.000000</td> + </tr> + <tr> + <td>TIMESTAMP_LTZ</td> + <td>STRING</td> + <td>with format: date-time (yyyy-MM-dd HH:mm:ss.SSSSSS, with UTC time zone), example: 2024-10-21 14:10:56.000000</td> + </tr> + <tr> + <td>CHAR(n)</td> + <td>STRING</td> + <td></td> + </tr> + <tr> + <td>VARCHAR(n)</td> + <td>STRING</td> + <td></td> + </tr> + <tr> + <td>ARRAY</td> + <td>ARRAY</td> + <td></td> + </tr> + <tr> + <td>MAP</td> + <td>STRING</td> + <td></td> + </tr> + <tr> + <td>ROW</td> + <td>STRING</td> + <td></td> + </tr> + </tbody> +</table> +</div> + +{{< top >}} \ No newline at end of file diff --git a/docs/content/docs/connectors/flink-sources/vitess-cdc.md b/docs/content/docs/connectors/flink-sources/vitess-cdc.md index cdc975de1..0ac1f9b72 100644 --- a/docs/content/docs/connectors/flink-sources/vitess-cdc.md +++ b/docs/content/docs/connectors/flink-sources/vitess-cdc.md @@ -49,7 +49,7 @@ more released versions will be available in the Maven central warehouse. Setup Vitess server ---------------- -You can follow the Local Install via [Docker guide](https://vitess.io/docs/get-started/local-docker/), or the Vitess Operator for [Kubernetes guide](https://vitess.io/docs/get-started/operator/) to install Vitess. No special setup is needed to support Vitess connector. +You can follow the Local Install via [Docker guide](https://vitess.io/docs/get-started/vttestserver-docker-image/), or the Vitess Operator for [Kubernetes guide](https://vitess.io/docs/get-started/operator/) to install Vitess. No special setup is needed to support Vitess connector. ### Checklist * Make sure that the VTGate host and its gRPC port (default is 15991) is accessible from the machine where the Vitess connector is installed diff --git a/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md b/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md new file mode 100644 index 000000000..579d7015e --- /dev/null +++ b/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md @@ -0,0 +1,275 @@ +--- +title: "Elasticsearch" +weight: 7 +type: docs +aliases: +- /connectors/pipeline-connectors/elasticsearch +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Elasticsearch Pipeline Connector + +The Elasticsearch Pipeline connector can be used as the *Data Sink* of the pipeline, and write data to Elasticsearch. This document describes how to set up the Elasticsearch Pipeline connector. + + +How to create Pipeline +---------------- + +The pipeline for reading data from MySQL and sink to Elasticsearch can be defined as follows: + +```yaml +source: + type: mysql + name: MySQL Source + hostname: 127.0.0.1 + port: 3306 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5401-5404 + +sink: + type: elasticsearch + name: Elasticsearch Sink + hosts: http://127.0.0.1:9092,http://127.0.0.1:9093 + +route: + - source-table: adb.\.* + sink-table: default_index + description: sync adb.\.* table to default_index + +pipeline: + name: MySQL to Elasticsearch Pipeline + parallelism: 2 +``` + +Pipeline Connector Options +---------------- +<div class="highlight"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-left" style="width: 8%">Required</th> + <th class="text-left" style="width: 7%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 50%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>type</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Specify what connector to use, here should be <code>'elasticsearch'</code>.</td> + </tr> + <tr> + <td>name</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The name of the sink.</td> + </tr> + <tr> + <td>hosts</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>One or more Elasticsearch hosts to connect to, e.g. 'http://host_name:9092,http://host_name:9093'.</td> + </tr> + <tr> + <td>version</td> + <td>optional</td> + <td style="word-wrap: break-word;">7</td> + <td>Integer</td> + <td>Specify what connector to use, valid values are: + <ul> + <li>6: connect to Elasticsearch 6.x cluster.</li> + <li>7: connect to Elasticsearch 7.x cluster.</li> + <li>8: connect to Elasticsearch 8.x cluster.</li> + </ul> + </td> + </tr> + <tr> + <td>username</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The username for Elasticsearch authentication.</td> + </tr> + <tr> + <td>password</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The password for Elasticsearch authentication.</td> + </tr> + <tr> + <td>batch.size.max</td> + <td>optional</td> + <td style="word-wrap: break-word;">500</td> + <td>Integer</td> + <td>Maximum number of buffered actions per bulk request. Can be set to '0' to disable it.</td> + </tr> + <tr> + <td>inflight.requests.max</td> + <td>optional</td> + <td style="word-wrap: break-word;">5</td> + <td>Integer</td> + <td>The maximum number of concurrent requests that the sink will try to execute.</td> + </tr> + <tr> + <td>buffered.requests.max</td> + <td>optional</td> + <td style="word-wrap: break-word;">1000</td> + <td>Integer</td> + <td>The maximum number of requests to keep in the in-memory buffer.</td> + </tr> + <tr> + <td>batch.size.max.bytes</td> + <td>optional</td> + <td style="word-wrap: break-word;">5242880</td> + <td>Long</td> + <td>The maximum size of batch requests in bytes.</td> + </tr> + <tr> + <td>buffer.time.max.ms</td> + <td>optional</td> + <td style="word-wrap: break-word;">5000</td> + <td>Long</td> + <td>The maximum time to wait for incomplete batches before flushing.</td> + </tr> + <tr> + <td>record.size.max.bytes</td> + <td>optional</td> + <td style="word-wrap: break-word;">10485760</td> + <td>Long</td> + <td>The maximum size of a single record in bytes.</td> + </tr> + </tbody> +</table> +</div> + +Usage Notes +-------- + +* The written index of Elasticsearch will be `namespace.schemaName.tableName` string of TableId,this can be changed using route function of pipeline. + +* No support for automatic Elasticsearch index creation. + +Data Type Mapping +---------------- +Elasticsearch stores document in a JSON string. So the data type mapping is between Flink CDC data type and JSON data type. +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">CDC type</th> + <th class="text-left">JSON type</th> + <th class="text-left" style="width:60%;">NOTE</th> + </tr> + </thead> + <tbody> + <tr> + <td>TINYINT</td> + <td>NUMBER</td> + <td></td> + </tr> + <tr> + <td>SMALLINT</td> + <td>NUMBER</td> + <td></td> + </tr> + <tr> + <td>INT</td> + <td>NUMBER</td> + <td></td> + </tr> + <tr> + <td>BIGINT</td> + <td>NUMBER</td> + <td></td> + </tr> + <tr> + <td>FLOAT</td> + <td>NUMBER</td> + <td></td> + </tr> + <tr> + <td>DOUBLE</td> + <td>NUMBER</td> + <td></td> + </tr> + <tr> + <td>DECIMAL(p, s)</td> + <td>STRING</td> + <td></td> + </tr> + <tr> + <td>BOOLEAN</td> + <td>BOOLEAN</td> + <td></td> + </tr> + <tr> + <td>DATE</td> + <td>STRING</td> + <td>with format: date (yyyy-MM-dd), example: 2024-10-21</td> + </tr> + <tr> + <td>TIMESTAMP</td> + <td>STRING</td> + <td>with format: date-time (yyyy-MM-dd HH:mm:ss.SSSSSS, with UTC time zone), example: 2024-10-21 14:10:56.000000</td> + </tr> + <tr> + <td>TIMESTAMP_LTZ</td> + <td>STRING</td> + <td>with format: date-time (yyyy-MM-dd HH:mm:ss.SSSSSS, with UTC time zone), example: 2024-10-21 14:10:56.000000</td> + </tr> + <tr> + <td>CHAR(n)</td> + <td>STRING</td> + <td></td> + </tr> + <tr> + <td>VARCHAR(n)</td> + <td>STRING</td> + <td></td> + </tr> + <tr> + <td>ARRAY</td> + <td>ARRAY</td> + <td></td> + </tr> + <tr> + <td>MAP</td> + <td>STRING</td> + <td></td> + </tr> + <tr> + <td>ROW</td> + <td>STRING</td> + <td></td> + </tr> + </tbody> +</table> +</div> + +{{< top >}} \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml index 896019c81..f634ec7a7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml @@ -197,7 +197,7 @@ limitations under the License. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cdc-composer</artifactId> - <version>${revision}</version> + <version>${project.version}</version> <scope>test</scope> </dependency> </dependencies>