This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 09fdeb75f7a1907d62249a3855d61538defc025d Author: MartijnVisser <[email protected]> AuthorDate: Mon Apr 25 13:20:06 2022 +0200 [FLINK-27394][Documentation] Remove existing Elasticsearch documentation since it has been moved to apache/flink-connector-elasticsearch --- .../docs/connectors/datastream/elasticsearch.md | 380 --------------------- .../docs/connectors/table/elasticsearch.md | 288 ---------------- .../docs/connectors/datastream/elasticsearch.md | 362 -------------------- .../content/docs/connectors/table/elasticsearch.md | 289 ---------------- 4 files changed, 1319 deletions(-) diff --git a/docs/content.zh/docs/connectors/datastream/elasticsearch.md b/docs/content.zh/docs/connectors/datastream/elasticsearch.md deleted file mode 100644 index aea933e17a8..00000000000 --- a/docs/content.zh/docs/connectors/datastream/elasticsearch.md +++ /dev/null @@ -1,380 +0,0 @@ ---- -title: Elasticsearch -weight: 5 -type: docs -aliases: - - /zh/dev/connectors/elasticsearch.html - - /zh/apis/streaming/connectors/elasticsearch.html - - /zh/dev/connectors/elasticsearch2.html - - /zh/apis/streaming/connectors/elasticsearch2.html ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -# Elasticsearch 连接器 - -此连接器提供可以向 [Elasticsearch](https://elastic.co/) 索引请求文档操作的 sinks。 -要使用此连接器,请根据 Elasticsearch 的安装版本将以下依赖之一添加到你的项目中: - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left">Elasticsearch 版本</th> - <th class="text-left">Maven 依赖</th> - </tr> - </thead> - <tbody> - <tr> - <td>6.x</td> - <td>{{< artifact flink-connector-elasticsearch6 >}}</td> - </tr> - <tr> - <td>7.x</td> - <td>{{< artifact flink-connector-elasticsearch7 >}}</td> - </tr> - </tbody> -</table> - -请注意,流连接器目前不是二进制发行版的一部分。 -有关如何将程序和用于集群执行的库一起打包,参考[此文档]({{< ref "docs/dev/configuration/overview" >}})。 - -## 安装 Elasticsearch - -Elasticsearch 集群的设置可以参考[此文档](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html)。 - -## Elasticsearch Sink - -下面的示例展示了如何配置并创建一个 sink: - -{{< tabs "51732edd-4218-470e-adad-b1ebb4021ae4" >}} -{{< tab "Java" >}} -Elasticsearch 6: -```java -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder; -import org.apache.flink.streaming.api.datastream.DataStream; - -import org.apache.http.HttpHost; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; - -import java.util.HashMap; -import java.util.Map; - -DataStream<String> input = ...; - -input.sinkTo( - new Elasticsearch6SinkBuilder<String>() - // 下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来 - .setBulkFlushMaxActions(1) - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter( - (element, context, indexer) -> - indexer.add(createIndexRequest(element))) - .build()); - -private static IndexRequest createIndexRequest(String element) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .id(element) - .source(json); -} -``` - -Elasticsearch 7: -```java -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder; -import org.apache.flink.streaming.api.datastream.DataStream; - -import org.apache.http.HttpHost; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; - -import java.util.HashMap; -import java.util.Map; - -DataStream<String> input = ...; - -input.sinkTo( - new Elasticsearch7SinkBuilder<String>() - // 下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来 - .setBulkFlushMaxActions(1) - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter( - (element, context, indexer) -> - indexer.add(createIndexRequest(element))) - .build()); - -private static IndexRequest createIndexRequest(String element) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .id(element) - .source(json); -} -``` -{{< /tab >}} -{{< tab "Scala" >}} -Elasticsearch 6: -```scala -import org.apache.flink.api.connector.sink.SinkWriter -import org.apache.flink.connector.elasticsearch.sink.{Elasticsearch6SinkBuilder, RequestIndexer} -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.http.HttpHost -import org.elasticsearch.action.index.IndexRequest -import org.elasticsearch.client.Requests - -val input: DataStream[String] = ... - -input.sinkTo( - new Elasticsearch6SinkBuilder[String] - // 下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来 - .setBulkFlushMaxActions(1) - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => - indexer.add(createIndexRequest(element))) - .build()) - -def createIndexRequest(element: (String)): IndexRequest = { - - val json = Map( - "data" -> element.asInstanceOf[AnyRef] - ) - - Requests.indexRequest.index("my-index").source(mapAsJavaMap(json)) -} -``` - -Elasticsearch 7: -```scala -import org.apache.flink.api.connector.sink.SinkWriter -import org.apache.flink.connector.elasticsearch.sink.{Elasticsearch7SinkBuilder, RequestIndexer} -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.http.HttpHost -import org.elasticsearch.action.index.IndexRequest -import org.elasticsearch.client.Requests - -val input: DataStream[String] = ... - -input.sinkTo( - new Elasticsearch7SinkBuilder[String] - // 下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来 - .setBulkFlushMaxActions(1) - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => - indexer.add(createIndexRequest(element))) - .build()) - -def createIndexRequest(element: (String)): IndexRequest = { - - val json = Map( - "data" -> element.asInstanceOf[AnyRef] - ) - - Requests.indexRequest.index("my-index").`type`("my-type").source(mapAsJavaMap(json)) -} -``` -{{< /tab >}} -{{< /tabs >}} - -需要注意的是,该示例仅演示了对每个传入的元素执行单个索引请求。 -通常,`ElasticsearchSinkFunction` 可用于执行多个不同类型的请求(例如 `DeleteRequest`、 `UpdateRequest` 等)。 - -在内部,Flink Elasticsearch Sink 的每个并行实例使用一个 `BulkProcessor` 向集群发送操作请求。 -这会在元素批量发送到集群之前进行缓存。 -`BulkProcessor` 一次执行一个批量请求,即不会存在两个并行刷新缓存的操作。 - -### Elasticsearch Sinks 和容错 - -通过启用 Flink checkpoint,Flink Elasticsearch Sink 保证至少一次将操作请求发送到 Elasticsearch 集群。 -这是通过在进行 checkpoint 时等待 `BulkProcessor` 中所有挂起的操作请求来实现。 -这有效地保证了在触发 checkpoint 之前所有的请求被 Elasticsearch 成功确认,然后继续处理发送到 sink 的记录。 - -关于 checkpoint 和容错的更多详细信息,请参见[容错文档]({{< ref "docs/learn-flink/fault_tolerance" >}})。 - -要使用具有容错特性的 Elasticsearch Sinks,需要在执行环境中启用作业拓扑的 checkpoint: - -{{< tabs "d00d1e93-4844-40d7-b0ec-9ec37e73145e" >}} -{{< tab "Java" >}} -Elasticsearch 6: -```java -final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.enableCheckpointing(5000); // 每 5000 毫秒执行一次 checkpoint - -Elasticsearch6SinkBuilder sinkBuilder = new Elasticsearch6SinkBuilder<String>() - .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter( - (element, context, indexer) -> - indexer.add(createIndexRequest(element))); -``` - -Elasticsearch 7: -```java -final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.enableCheckpointing(5000); // 每 5000 毫秒执行一次 checkpoint - -Elasticsearch7SinkBuilder sinkBuilder = new Elasticsearch7SinkBuilder<String>() - .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter( - (element, context, indexer) -> - indexer.add(createIndexRequest(element))); -``` -{{< /tab >}} -{{< tab "Scala" >}} -Elasticsearch 6: -```scala -val env = StreamExecutionEnvironment.getExecutionEnvironment() -env.enableCheckpointing(5000) // 每 5000 毫秒执行一次 checkpoint - -val sinkBuilder = new Elasticsearch6SinkBuilder[String] - .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => - indexer.add(createIndexRequest(element))) -``` - -Elasticsearch 7: -```scala -val env = StreamExecutionEnvironment.getExecutionEnvironment() -env.enableCheckpointing(5000) // 每 5000 毫秒执行一次 checkpoint - -val sinkBuilder = new Elasticsearch7SinkBuilder[String] - .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => - indexer.add(createIndexRequest(element))) -``` -{{< /tab >}} -{{< /tabs >}} - -<p style="border-radius: 5px; padding: 5px" class="bg-info"> -Using UpdateRequests with deterministic ids and the upsert method it is possible to achieve exactly-once semantics in Elasticsearch when AT_LEAST_ONCE delivery is configured for the connector. -</p> - -### 处理失败的 Elasticsearch 请求 - -Elasticsearch 操作请求可能由于多种原因而失败,包括节点队列容量暂时已满或者要被索引的文档格式错误。 -Flink Elasticsearch Sink 允许用户通过通过指定一个退避策略来重试请求。 - -下面是一个例子: - -{{< tabs "ddb958b3-5dd5-476e-b946-ace3335628b2" >}} -{{< tab "Java" >}} -Elasticsearch 6: -```java -DataStream<String> input = ...; - -input.sinkTo( - new Elasticsearch6SinkBuilder<String>() - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter( - (element, context, indexer) -> - indexer.add(createIndexRequest(element))) - // 这里启用了一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5 - .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000) - .build()); -``` - -Elasticsearch 7: -```java -DataStream<String> input = ...; - -input.sinkTo( - new Elasticsearch7SinkBuilder<String>() - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter( - (element, context, indexer) -> - indexer.add(createIndexRequest(element))) - // 这里启用了一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5 - .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000) - .build()); -``` -{{< /tab >}} -{{< tab "Scala" >}} -Elasticsearch 6: -```scala -val input: DataStream[String] = ... - -input.sinkTo( - new Elasticsearch6SinkBuilder[String] - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => - indexer.add(createIndexRequest(element))) - // 这里启用了一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5 - .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000) - .build()) -``` - -Elasticsearch 7: -```scala -val input: DataStream[String] = ... - -input.sinkTo( - new Elasticsearch7SinkBuilder[String] - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => - indexer.add(createIndexRequest(element))) - // 这里启用了一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5 - .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000) - .build()) -``` -{{< /tab >}} -{{< /tabs >}} - -上面的示例 sink 重新添加由于资源受限(例如:队列容量已满)而失败的请求。对于其它类型的故障,例如文档格式错误,sink 将会失败。 -如若未设置 BulkFlushBackoffStrategy (或者 `FlushBackoffType.NONE`),那么任何类型的错误都会导致 sink 失败。 - -<p style="border-radius: 5px; padding: 5px" class="bg-danger"> -<b>重要提示</b>:在失败时将请求重新添加回内部 <b>BulkProcessor</b> 会导致更长的 checkpoint,因为在进行 checkpoint 时,sink 还需要等待重新添加的请求被刷新。 -例如,当使用 <b>FlushBackoffType.EXPONENTIAL</b> 时, -checkpoint 会进行等待,直到 Elasticsearch 节点队列有足够的容量来处理所有挂起的请求,或者达到最大重试次数。 -</p> - -### 配置内部批量处理器 - -通过使用以下在 Elasticsearch6SinkBuilder 中提供的方法,可以进一步配置内部的 `BulkProcessor` 关于其如何刷新缓存操作请求的行为: - - * **setBulkFlushMaxActions(int numMaxActions)**:刷新前最大缓存的操作数。 - * **setBulkFlushMaxSizeMb(int maxSizeMb)**:刷新前最大缓存的数据量(以兆字节为单位)。 - * **setBulkFlushInterval(long intervalMillis)**:刷新的时间间隔(不论缓存操作的数量或大小如何)。 - -还支持配置如何对暂时性请求错误进行重试: - - * **setBulkFlushBackoffStrategy(FlushBackoffType flushBackoffType, int maxRetries, long delayMillis)**:退避延迟的类型,`CONSTANT` 或者 `EXPONENTIAL`,退避重试次数,退避重试的时间间隔。 - 对于常量延迟来说,此值是每次重试间的间隔。对于指数延迟来说,此值是延迟的初始值。 - -可以在[此文档](https://elastic.co)找到 Elasticsearch 的更多信息。 - -## 将 Elasticsearch 连接器打包到 Uber-Jar 中 - -建议构建一个包含所有依赖的 uber-jar (可执行的 jar),以便更好地执行你的 Flink 程序。 -(更多信息参见[此文档]({{< ref "docs/dev/configuration/overview" >}}))。 - -或者,你可以将连接器的 jar 文件放入 Flink 的 `lib/` 目录下,使其在全局范围内可用,即可用于所有的作业。 - -{{< top >}} diff --git a/docs/content.zh/docs/connectors/table/elasticsearch.md b/docs/content.zh/docs/connectors/table/elasticsearch.md deleted file mode 100644 index 3395eda1370..00000000000 --- a/docs/content.zh/docs/connectors/table/elasticsearch.md +++ /dev/null @@ -1,288 +0,0 @@ ---- -title: Elasticsearch -weight: 7 -type: docs -aliases: - - /zh/dev/table/connectors/elasticsearch.html ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -# Elasticsearch SQL 连接器 - -{{< label "Sink: Batch" >}} -{{< label "Sink: Streaming Append & Upsert Mode" >}} - -Elasticsearch 连接器允许将数据写入到 Elasticsearch 引擎的索引中。本文档描述运行 SQL 查询时如何设置 Elasticsearch 连接器。 - -连接器可以工作在 upsert 模式,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。 - -如果 DDL 中没有定义主键,那么连接器只能工作在 append 模式,只能与外部系统交换 INSERT 消息。 - -依赖 ------------- - -{{< sql_download_table "elastic" >}} - -Elasticsearch 连接器不是二进制发行版的一部分,请查阅[这里]({{< ref "docs/dev/configuration/overview" >}})了解如何在集群运行中引用 Elasticsearch 连接器。 - -如何创建 Elasticsearch 表 ----------------- - -以下示例展示了如何创建 Elasticsearch sink 表: - -```sql -CREATE TABLE myUserTable ( - user_id STRING, - user_name STRING, - uv BIGINT, - pv BIGINT, - PRIMARY KEY (user_id) NOT ENFORCED -) WITH ( - 'connector' = 'elasticsearch-7', - 'hosts' = 'http://localhost:9200', - 'index' = 'users' -); -``` - -连接器参数 ----------------- - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left" style="width: 25%">参数</th> - <th class="text-center" style="width: 8%">是否必选</th> - <th class="text-center" style="width: 7%">默认值</th> - <th class="text-center" style="width: 10%">数据类型</th> - <th class="text-center" style="width: 50%">描述</th> - </tr> - </thead> - <tbody> - <tr> - <td><h5>connector</h5></td> - <td>必选</td> - <td style="word-wrap: break-word;">(none)</td> - <td>String</td> - <td>指定要使用的连接器,有效值为: - <ul> - <li><code>elasticsearch-6</code>:连接到 Elasticsearch 6.x 的集群。</li> - <li><code>elasticsearch-7</code>:连接到 Elasticsearch 7.x 及更高版本的集群。</li> - </ul></td> - </tr> - <tr> - <td><h5>hosts</h5></td> - <td>必选</td> - <td style="word-wrap: break-word;">(none)</td> - <td>String</td> - <td>要连接到的一台或多台 Elasticsearch 主机,例如 <code>'http://host_name:9092;http://host_name:9093'</code>。</td> - </tr> - <tr> - <td><h5>index</h5></td> - <td>必选</td> - <td style="word-wrap: break-word;">(none)</td> - <td>String</td> - <td>Elasticsearch 中每条记录的索引。可以是一个静态索引(例如 <code>'myIndex'</code>)或一个动态索引(例如 <code>'index-{log_ts|yyyy-MM-dd}'</code>)。 - 更多详细信息,请参见下面的<a href="#动态索引">动态索引</a>部分。</td> - </tr> - <tr> - <td><h5>document-type</h5></td> - <td>6.x 版本中必选</td> - <td style="word-wrap: break-word;">(none)</td> - <td>String</td> - <td>Elasticsearch 文档类型。在 <code>elasticsearch-7</code> 中不再需要。</td> - </tr> - <tr> - <td><h5>document-id.key-delimiter</h5></td> - <td>可选</td> - <td style="word-wrap: break-word;">_</td> - <td>String</td> - <td>复合键的分隔符(默认为"_"),例如,指定为"$"将导致文档 ID 为"KEY1$KEY2$KEY3"。</td> - </tr> - <tr> - <td><h5>username</h5></td> - <td>可选</td> - <td style="word-wrap: break-word;">(none)</td> - <td>String</td> - <td>用于连接 Elasticsearch 实例的用户名。请注意,Elasticsearch 没有预绑定安全特性,但你可以通过如下<a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/configuring-security.html">指南</a>启用它来保护 Elasticsearch 集群。</td> - </tr> - <tr> - <td><h5>password</h5></td> - <td>可选</td> - <td style="word-wrap: break-word;">(none)</td> - <td>String</td> - <td>用于连接 Elasticsearch 实例的密码。如果配置了<code>username</code>,则此选项也必须配置为非空字符串。</td> - </tr> - <tr> - <td><h5>failure-handler</h5></td> - <td>optional</td> - <td style="word-wrap: break-word;">fail</td> - <td>String</td> - <td>对 Elasticsearch 请求失败情况下的失败处理策略。有效策略为: - <ul> - <li><code>fail</code>:如果请求失败并因此导致作业失败,则抛出异常。</li> - <li><code>ignore</code>:忽略失败并放弃请求。</li> - <li><code>retry-rejected</code>:重新添加由于队列容量饱和而失败的请求。</li> - <li>自定义类名称:使用 ActionRequestFailureHandler 的子类进行失败处理。</li> - </ul> - </td> - </tr> - <tr> - <td><h5>sink.flush-on-checkpoint</h5></td> - <td>optional</td> - <td style="word-wrap: break-word;">true</td> - <td>Boolean</td> - <td>在进行 checkpoint 时是否保证刷出缓冲区中的数据。如果关闭这一选项,在进行checkpoint时 sink 将不再为所有进行 - 中的请求等待 Elasticsearch 的执行完成确认。因此,在这种情况下 sink 将不对至少一次的请求的一致性提供任何保证。 - </td> - </tr> - <tr> - <td><h5>sink.bulk-flush.max-actions</h5></td> - <td>可选</td> - <td style="word-wrap: break-word;">1000</td> - <td>Integer</td> - <td>每个批量请求的最大缓冲操作数。 - 可以设置为<code>'0'</code>来禁用它。 - </td> - </tr> - <tr> - <td><h5>sink.bulk-flush.max-size</h5></td> - <td>可选</td> - <td style="word-wrap: break-word;">2mb</td> - <td>MemorySize</td> - <td>每个批量请求的缓冲操作在内存中的最大值。单位必须为 MB。 - 可以设置为<code>'0'</code>来禁用它。 - </td> - </tr> - <tr> - <td><h5>sink.bulk-flush.interval</h5></td> - <td>可选</td> - <td style="word-wrap: break-word;">1s</td> - <td>Duration</td> - <td>flush 缓冲操作的间隔。 - 可以设置为<code>'0'</code>来禁用它。注意,<code>'sink.bulk-flush.max-size'</code>和<code>'sink.bulk-flush.max-actions'</code>都设置为<code>'0'</code>的这种 flush 间隔设置允许对缓冲操作进行完全异步处理。 - </td> - </tr> - <tr> - <td><h5>sink.bulk-flush.backoff.strategy</h5></td> - <td>可选</td> - <td style="word-wrap: break-word;">DISABLED</td> - <td>String</td> - <td>指定在由于临时请求错误导致任何 flush 操作失败时如何执行重试。有效策略为: - <ul> - <li><code>DISABLED</code>:不执行重试,即第一次请求错误后失败。</li> - <li><code>CONSTANT</code>:等待重试之间的回退延迟。</li> - <li><code>EXPONENTIAL</code>:先等待回退延迟,然后在重试之间指数递增。</li> - </ul> - </td> - </tr> - <tr> - <td><h5>sink.bulk-flush.backoff.max-retries</h5></td> - <td>可选</td> - <td style="word-wrap: break-word;">(none)</td> - <td>Integer</td> - <td>最大回退重试次数。</td> - </tr> - <tr> - <td><h5>sink.bulk-flush.backoff.delay</h5></td> - <td>可选</td> - <td style="word-wrap: break-word;">(none)</td> - <td>Duration</td> - <td>每次退避尝试之间的延迟。对于 <code>CONSTANT</code> 退避策略,该值是每次重试之间的延迟。对于 <code>EXPONENTIAL</code> 退避策略,该值是初始的延迟。</td> - </tr> - <tr> - <td><h5>connection.path-prefix</h5></td> - <td>可选</td> - <td style="word-wrap: break-word;">(none)</td> - <td>String</td> - <td>添加到每个 REST 通信中的前缀字符串,例如,<code>'/v1'</code>。</td> - </tr> - <tr> - <td><h5>connection.request-timeout</h5></td> - <td>可选</td> - <td style="word-wrap: break-word;">(none)</td> - <td>Duration</td> - <td>从连接管理器请求连接的超时时间。超时时间必须大于或者等于 0,如果设置为 0 则是无限超时。</td> - </tr> - <tr> - <td><h5>connection.timeout</h5></td> - <td>可选</td> - <td style="word-wrap: break-word;">(none)</td> - <td>Duration</td> - <td>建立请求的超时时间 。超时时间必须大于或者等于 0 ,如果设置为 0 则是无限超时。</td> - </tr> - <tr> - <td><h5>socket.timeout</h5></td> - <td>可选</td> - <td style="word-wrap: break-word;">(none)</td> - <td>Duration</td> - <td>等待数据的 socket 的超时时间 (SO_TIMEOUT)。超时时间必须大于或者等于 0,如果设置为 0 则是无限超时。 - </td> - </tr> - <tr> - <td><h5>format</h5></td> - <td>可选</td> - <td style="word-wrap: break-word;">json</td> - <td>String</td> - <td>Elasticsearch 连接器支持指定格式。该格式必须生成一个有效的 json 文档。 - 默认使用内置的 <code>'json'</code> 格式。更多详细信息,请参阅 <a href="{{< ref "docs/connectors/table/formats/overview" >}}">JSON Format</a> 页面。 - </td> - </tr> - </tbody> -</table> - -特性 ----------------- - -### Key 处理 - -Elasticsearch sink 可以根据是否定义了一个主键来确定是在 upsert 模式还是 append 模式下工作。 -如果定义了主键,Elasticsearch sink 将以 upsert 模式工作,该模式可以消费包含 UPDATE/DELETE 消息的查询。 -如果未定义主键,Elasticsearch sink 将以 append 模式工作,该模式只能消费包含 INSERT 消息的查询。 - -在 Elasticsearch 连接器中,主键用于计算 Elasticsearch 的文档 id,文档 id 为最多 512 字节且不包含空格的字符串。 -Elasticsearch 连接器通过使用 `document-id.key-delimiter` 指定的键分隔符按照 DDL 中定义的顺序连接所有主键字段,为每一行记录生成一个文档 ID 字符串。 -某些类型不允许作为主键字段,因为它们没有对应的字符串表示形式,例如,`BYTES`,`ROW`,`ARRAY`,`MAP` 等。 -如果未指定主键,Elasticsearch 将自动生成文档 id。 - -有关 PRIMARY KEY 语法的更多详细信息,请参见 [CREATE TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table)。 - -### 动态索引 - -Elasticsearch sink 同时支持静态索引和动态索引。 - -如果你想使用静态索引,则 `index` 选项值应为纯字符串,例如 `'myusers'`,所有记录都将被写入到 "myusers" 索引中。 - -如果你想使用动态索引,你可以使用 `{field_name}` 来引用记录中的字段值来动态生成目标索引。 -你也可以使用 `'{field_name|date_format_string}'` 将 `TIMESTAMP/DATE/TIME` 类型的字段值转换为 `date_format_string` 指定的格式。 -`date_format_string` 与 Java 的 [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/index.html) 兼容。 -例如,如果选项值设置为 `'myusers-{log_ts|yyyy-MM-dd}'`,则 `log_ts` 字段值为 `2020-03-27 12:25:55` 的记录将被写入到 "myusers-2020-03-27" 索引中。 - -你也可以使用 `'{now()|date_format_string}'` 将当前的系统时间转换为 `date_format_string` 指定的格式。`now()` 对应的时间类型是 `TIMESTAMP_WITH_LTZ` 。 -在将系统时间格式化为字符串时会使用 session 中通过 `table.local-time-zone` 中配置的时区。 使用 `NOW()`, `now()`, `CURRENT_TIMESTAMP`, `current_timestamp` 均可以。 - -**注意:** 使用当前系统时间生成的动态索引时, 对于 changelog 的流,无法保证同一主键对应的记录能产生相同的索引名, 因此使用基于系统时间的动态索引,只能支持 append only 的流。 - -数据类型映射 ----------------- - -Elasticsearch 将文档存储在 JSON 字符串中。因此数据类型映射介于 Flink 数据类型和 JSON 数据类型之间。 -Flink 为 Elasticsearch 连接器使用内置的 `'json'` 格式。更多类型映射的详细信息,请参阅 [JSON Format]({{< ref "docs/connectors/table/formats/json" >}}) 页面。 - -{{< top >}} diff --git a/docs/content/docs/connectors/datastream/elasticsearch.md b/docs/content/docs/connectors/datastream/elasticsearch.md deleted file mode 100644 index d467bb391b2..00000000000 --- a/docs/content/docs/connectors/datastream/elasticsearch.md +++ /dev/null @@ -1,362 +0,0 @@ ---- -title: Elasticsearch -weight: 5 -type: docs -aliases: - - /dev/connectors/elasticsearch.html - - /apis/streaming/connectors/elasticsearch.html - - /dev/connectors/elasticsearch2.html - - /apis/streaming/connectors/elasticsearch2.html ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -# Elasticsearch Connector - -This connector provides sinks that can request document actions to an -[Elasticsearch](https://elastic.co/) Index. To use this connector, add one -of the following dependencies to your project, depending on the version -of the Elasticsearch installation: - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left">Elasticsearch version</th> - <th class="text-left">Maven Dependency</th> - </tr> - </thead> - <tbody> - <tr> - <td>6.x</td> - <td>{{< artifact flink-connector-elasticsearch6 >}}</td> - </tr> - <tr> - <td>7.x</td> - <td>{{< artifact flink-connector-elasticsearch7 >}}</td> - </tr> - </tbody> -</table> - -Note that the streaming connectors are currently not part of the binary -distribution. See [here]({{< ref "docs/dev/configuration/overview" >}}) for information -about how to package the program with the libraries for cluster execution. - -## Installing Elasticsearch - -Instructions for setting up an Elasticsearch cluster can be found -[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). - -## Elasticsearch Sink - -The example below shows how to configure and create a sink: - -{{< tabs "51732edd-4218-470e-adad-b1ebb4021ae4" >}} -{{< tab "Java" >}} -Elasticsearch 6: -```java -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder; -import org.apache.flink.streaming.api.datastream.DataStream; - -import org.apache.http.HttpHost; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; - -import java.util.HashMap; -import java.util.Map; - -DataStream<String> input = ...; - -input.sinkTo( - new Elasticsearch6SinkBuilder<String>() - .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter( - (element, context, indexer) -> - indexer.add(createIndexRequest(element))) - .build()); - -private static IndexRequest createIndexRequest(String element) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .id(element) - .source(json); -} -``` - -Elasticsearch 7: -```java -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder; -import org.apache.flink.streaming.api.datastream.DataStream; - -import org.apache.http.HttpHost; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; - -import java.util.HashMap; -import java.util.Map; - -DataStream<String> input = ...; - -input.sinkTo( - new Elasticsearch7SinkBuilder<String>() - .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter( - (element, context, indexer) -> - indexer.add(createIndexRequest(element))) - .build()); - -private static IndexRequest createIndexRequest(String element) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .id(element) - .source(json); -} -``` -{{< /tab >}} -{{< tab "Scala" >}} -Elasticsearch 6: -```scala -import org.apache.flink.api.connector.sink.SinkWriter -import org.apache.flink.connector.elasticsearch.sink.{Elasticsearch6SinkBuilder, RequestIndexer} -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.http.HttpHost -import org.elasticsearch.action.index.IndexRequest -import org.elasticsearch.client.Requests - -val input: DataStream[String] = ... - -input.sinkTo( - new Elasticsearch6SinkBuilder[String] - .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => - indexer.add(createIndexRequest(element))) - .build()) - -def createIndexRequest(element: (String)): IndexRequest = { - - val json = Map( - "data" -> element.asInstanceOf[AnyRef] - ) - - Requests.indexRequest.index("my-index").`type`("my-type").source(mapAsJavaMap(json)) -} -``` - -Elasticsearch 7: -```scala -import org.apache.flink.api.connector.sink.SinkWriter -import org.apache.flink.connector.elasticsearch.sink.{Elasticsearch7SinkBuilder, RequestIndexer} -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.http.HttpHost -import org.elasticsearch.action.index.IndexRequest -import org.elasticsearch.client.Requests - -val input: DataStream[String] = ... - -input.sinkTo( - new Elasticsearch7SinkBuilder[String] - .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => - indexer.add(createIndexRequest(element))) - .build()) - -def createIndexRequest(element: (String)): IndexRequest = { - - val json = Map( - "data" -> element.asInstanceOf[AnyRef] - ) - - Requests.indexRequest.index("my-index").source(mapAsJavaMap(json)) -} -``` -{{< /tab >}} -{{< /tabs >}} - -Note that the example only demonstrates performing a single index -request for each incoming element. Generally, the `ElasticsearchEmitter` -can be used to perform requests of different types (ex., -`DeleteRequest`, `UpdateRequest`, etc.). - -Internally, each parallel instance of the Flink Elasticsearch Sink uses -a `BulkProcessor` to send action requests to the cluster. -This will buffer elements before sending them in bulk to the cluster. The `BulkProcessor` -executes bulk requests one at a time, i.e. there will be no two concurrent -flushes of the buffered actions in progress. - -### Elasticsearch Sinks and Fault Tolerance - -With Flink’s checkpointing enabled, the Flink Elasticsearch Sink guarantees -at-least-once delivery of action requests to Elasticsearch clusters. It does -so by waiting for all pending action requests in the `BulkProcessor` at the -time of checkpoints. This effectively assures that all requests before the -checkpoint was triggered have been successfully acknowledged by Elasticsearch, before -proceeding to process more records sent to the sink. - -More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{< ref "docs/learn-flink/fault_tolerance" >}}). - -To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs to be enabled at the execution environment: - -{{< tabs "d00d1e93-4844-40d7-b0ec-9ec37e73145e" >}} -{{< tab "Java" >}} -```java -final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.enableCheckpointing(5000); // checkpoint every 5000 msecs -``` -{{< /tab >}} -{{< tab "Scala" >}} -Elasticsearch 6: -```scala -val env = StreamExecutionEnvironment.getExecutionEnvironment() -env.enableCheckpointing(5000) // checkpoint every 5000 msecs -``` -{{< /tab >}} -{{< /tabs >}} - -<p style="border-radius: 5px; padding: 5px" class="bg-info"> -<b>IMPORTANT</b>: Checkpointing is not enabled by default but the default delivery guarantee is AT_LEAST_ONCE. -This causes the sink to buffer requests until it either finishes or the BulkProcessor flushes automatically. -By default, the BulkProcessor will flush after 1000 added Actions. To configure the processor to flush more frequently, please refer to the <a href="#configuring-the-internal-bulk-processor">BulkProcessor configuration section</a>. -</p> - -<p style="border-radius: 5px; padding: 5px" class="bg-info"> -Using UpdateRequests with deterministic ids and the upsert method it is possible to achieve exactly-once semantics in Elasticsearch when AT_LEAST_ONCE delivery is configured for the connector. -</p> - - -### Handling Failing Elasticsearch Requests - -Elasticsearch action requests may fail due to a variety of reasons, including -temporarily saturated node queue capacity or malformed documents to be indexed. -The Flink Elasticsearch Sink allows the user to retry requests by specifying a backoff-policy. - -Below is an example: - -{{< tabs "ddb958b3-5dd5-476e-b946-ace3335628b2" >}} -{{< tab "Java" >}} -Elasticsearch 6: -```java -DataStream<String> input = ...; - -input.sinkTo( - new Elasticsearch6SinkBuilder<String>() - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter( - (element, context, indexer) -> - indexer.add(createIndexRequest(element))) - // This enables an exponential backoff retry mechanism, with a maximum of 5 retries and an initial delay of 1000 milliseconds - .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000) - .build()); -``` - -Elasticsearch 7: -```java -DataStream<String> input = ...; - -input.sinkTo( - new Elasticsearch7SinkBuilder<String>() - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter( - (element, context, indexer) -> - indexer.add(createIndexRequest(element))) - // This enables an exponential backoff retry mechanism, with a maximum of 5 retries and an initial delay of 1000 milliseconds - .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000) - .build()); -``` -{{< /tab >}} -{{< tab "Scala" >}} -Elasticsearch 6: -```scala -val input: DataStream[String] = ... - -input.sinkTo( - new Elasticsearch6SinkBuilder[String] - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => - indexer.add(createIndexRequest(element))) - // This enables an exponential backoff retry mechanism, with a maximum of 5 retries and an initial delay of 1000 milliseconds - .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000) - .build()) -``` - -Elasticsearch 7: -```scala -val input: DataStream[String] = ... - -input.sinkTo( - new Elasticsearch7SinkBuilder[String] - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => - indexer.add(createIndexRequest(element))) - // This enables an exponential backoff retry mechanism, with a maximum of 5 retries and an initial delay of 1000 milliseconds - .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000) - .build()) -``` -{{< /tab >}} -{{< /tabs >}} - -The above example will let the sink re-add requests that failed due to resource constrains (e.g. -queue capacity saturation). For all other failures, such as malformed documents, the sink will fail. -If no BulkFlushBackoffStrategy (or `FlushBackoffType.NONE`) is configured, the sink will fail for any kind of error. - -<p style="border-radius: 5px; padding: 5px" class="bg-danger"> -<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b> -on failures will lead to longer checkpoints, as the sink will also -need to wait for the re-added requests to be flushed when checkpointing. -For example, when using <b>FlushBackoffType.EXPONENTIAL</b>, checkpoints -will need to wait until Elasticsearch node queues have enough capacity for -all the pending requests, or until the maximum number of retries has been reached. -</p> - -### Configuring the Internal Bulk Processor - -The internal `BulkProcessor` can be further configured for its behaviour -on how buffered action requests are flushed, by using the following methods of the Elasticsearch6SinkBuilder: - -* **setBulkFlushMaxActions(int numMaxActions)**: Maximum amount of actions to buffer before flushing. -* **setBulkFlushMaxSizeMb(int maxSizeMb)**: Maximum size of data (in megabytes) to buffer before flushing. -* **setBulkFlushInterval(long intervalMillis)**: Interval at which to flush regardless of the amount or size of buffered actions. - -Configuring how temporary request errors are retried is also supported: - * **setBulkFlushBackoffStrategy(FlushBackoffType flushBackoffType, int maxRetries, long delayMillis)**: The type of backoff delay, either `CONSTANT` or `EXPONENTIAL`, the amount of backoff retries to attempt, the amount of delay for backoff. For constant backoff, this - is simply the delay between each retry. For exponential backoff, this is the initial base delay. - -More information about Elasticsearch can be found [here](https://elastic.co). - -## Packaging the Elasticsearch Connector into an Uber-Jar - -For the execution of your Flink program, it is recommended to build a -so-called uber-jar (executable jar) containing all your dependencies -(see [here]({{< ref "docs/dev/configuration" >}}) for further information). - -Alternatively, you can put the connector's jar file into Flink's `lib/` folder to make it available -system-wide, i.e. for all job being run. - -{{< top >}} diff --git a/docs/content/docs/connectors/table/elasticsearch.md b/docs/content/docs/connectors/table/elasticsearch.md deleted file mode 100644 index 7e5cf99a8ce..00000000000 --- a/docs/content/docs/connectors/table/elasticsearch.md +++ /dev/null @@ -1,289 +0,0 @@ ---- -title: Elasticsearch -weight: 7 -type: docs -aliases: - - /dev/table/connectors/elasticsearch.html ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -# Elasticsearch SQL Connector - -{{< label "Sink: Batch" >}} -{{< label "Sink: Streaming Append & Upsert Mode" >}} - -The Elasticsearch connector allows for writing into an index of the Elasticsearch engine. This document describes how to setup the Elasticsearch Connector to run SQL queries against Elasticsearch. - -The connector can operate in upsert mode for exchanging UPDATE/DELETE messages with the external system using the primary key defined on the DDL. - -If no primary key is defined on the DDL, the connector can only operate in append mode for exchanging INSERT only messages with external system. - -Dependencies ------------- - -{{< sql_download_table "elastic" >}} - -The Elasticsearch connector is not part of the binary distribution. -See how to link with it for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}). - -How to create an Elasticsearch table ----------------- - -The example below shows how to create an Elasticsearch sink table: - -```sql -CREATE TABLE myUserTable ( - user_id STRING, - user_name STRING, - uv BIGINT, - pv BIGINT, - PRIMARY KEY (user_id) NOT ENFORCED -) WITH ( - 'connector' = 'elasticsearch-7', - 'hosts' = 'http://localhost:9200', - 'index' = 'users' -); -``` - -Connector Options ----------------- - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left" style="width: 25%">Option</th> - <th class="text-center" style="width: 8%">Required</th> - <th class="text-center" style="width: 8%">Forwarded</th> - <th class="text-center" style="width: 7%">Default</th> - <th class="text-center" style="width: 10%">Type</th> - <th class="text-center" style="width: 42%">Description</th> - </tr> - </thead> - <tbody> - <tr> - <td><h5>connector</h5></td> - <td>required</td> - <td>no</td> - <td style="word-wrap: break-word;">(none)</td> - <td>String</td> - <td>Specify what connector to use, valid values are: - <ul> - <li><code>elasticsearch-6</code>: connect to Elasticsearch 6.x cluster.</li> - <li><code>elasticsearch-7</code>: connect to Elasticsearch 7.x cluster.</li> - </ul></td> - </tr> - <tr> - <td><h5>hosts</h5></td> - <td>required</td> - <td>yes</td> - <td style="word-wrap: break-word;">(none)</td> - <td>String</td> - <td>One or more Elasticsearch hosts to connect to, e.g. <code>'http://host_name:9092;http://host_name:9093'</code>.</td> - </tr> - <tr> - <td><h5>index</h5></td> - <td>required</td> - <td>yes</td> - <td style="word-wrap: break-word;">(none)</td> - <td>String</td> - <td>Elasticsearch index for every record. Can be a static index (e.g. <code>'myIndex'</code>) or - a dynamic index (e.g. <code>'index-{log_ts|yyyy-MM-dd}'</code>). - See the following <a href="#dynamic-index">Dynamic Index</a> section for more details.</td> - </tr> - <tr> - <td><h5>document-type</h5></td> - <td>required in 6.x</td> - <td>yes in 6.x</td> - <td style="word-wrap: break-word;">(none)</td> - <td>String</td> - <td>Elasticsearch document type. Not necessary anymore in <code>elasticsearch-7</code>.</td> - </tr> - <tr> - <td><h5>document-id.key-delimiter</h5></td> - <td>optional</td> - <td>yes</td> - <td style="word-wrap: break-word;">_</td> - <td>String</td> - <td>Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3".</td> - </tr> - <tr> - <td><h5>username</h5></td> - <td>optional</td> - <td>yes</td> - <td style="word-wrap: break-word;">(none)</td> - <td>String</td> - <td>Username used to connect to Elasticsearch instance. Please notice that Elasticsearch doesn't pre-bundled security feature, but you can enable it by following the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/configuring-security.html">guideline</a> to secure an Elasticsearch cluster.</td> - </tr> - <tr> - <td><h5>password</h5></td> - <td>optional</td> - <td>yes</td> - <td style="word-wrap: break-word;">(none)</td> - <td>String</td> - <td>Password used to connect to Elasticsearch instance. If <code>username</code> is configured, this option must be configured with non-empty string as well.</td> - </tr> - <tr> - <td><h5>failure-handler</h5></td> - <td>optional</td> - <td>yes</td> - <td style="word-wrap: break-word;">fail</td> - <td>String</td> - <td>Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are: - <ul> - <li><code>fail</code>: throws an exception if a request fails and thus causes a job failure.</li> - <li><code>ignore</code>: ignores failures and drops the request.</li> - <li><code>retry-rejected</code>: re-adds requests that have failed due to queue capacity saturation.</li> - <li>custom class name: for failure handling with a ActionRequestFailureHandler subclass.</li> - </ul> - </td> - </tr> - <tr> - <td><h5>sink.flush-on-checkpoint</h5></td> - <td>optional</td> - <td></td> - <td style="word-wrap: break-word;">true</td> - <td>Boolean</td> - <td>Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests - to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong - guarantees for at-least-once delivery of action requests. - </td> - </tr> - <tr> - <td><h5>sink.bulk-flush.max-actions</h5></td> - <td>optional</td> - <td>yes</td> - <td style="word-wrap: break-word;">1000</td> - <td>Integer</td> - <td>Maximum number of buffered actions per bulk request. - Can be set to <code>'0'</code> to disable it. - </td> - </tr> - <tr> - <td><h5>sink.bulk-flush.max-size</h5></td> - <td>optional</td> - <td>yes</td> - <td style="word-wrap: break-word;">2mb</td> - <td>MemorySize</td> - <td>Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. - Can be set to <code>'0'</code> to disable it. - </td> - </tr> - <tr> - <td><h5>sink.bulk-flush.interval</h5></td> - <td>optional</td> - <td>yes</td> - <td style="word-wrap: break-word;">1s</td> - <td>Duration</td> - <td>The interval to flush buffered actions. - Can be set to <code>'0'</code> to disable it. Note, both <code>'sink.bulk-flush.max-size'</code> and <code>'sink.bulk-flush.max-actions'</code> - can be set to <code>'0'</code> with the flush interval set allowing for complete async processing of buffered actions. - </td> - </tr> - <tr> - <td><h5>sink.bulk-flush.backoff.strategy</h5></td> - <td>optional</td> - <td>yes</td> - <td style="word-wrap: break-word;">DISABLED</td> - <td>String</td> - <td>Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are: - <ul> - <li><code>DISABLED</code>: no retry performed, i.e. fail after the first request error.</li> - <li><code>CONSTANT</code>: wait for backoff delay between retries.</li> - <li><code>EXPONENTIAL</code>: initially wait for backoff delay and increase exponentially between retries.</li> - </ul> - </td> - </tr> - <tr> - <td><h5>sink.bulk-flush.backoff.max-retries</h5></td> - <td>optional</td> - <td>yes</td> - <td style="word-wrap: break-word;">(none)</td> - <td>Integer</td> - <td>Maximum number of backoff retries.</td> - </tr> - <tr> - <td><h5>sink.bulk-flush.backoff.delay</h5></td> - <td>optional</td> - <td>yes</td> - <td style="word-wrap: break-word;">(none)</td> - <td>Duration</td> - <td>Delay between each backoff attempt. For <code>CONSTANT</code> backoff, this is simply the delay between each retry. For <code>EXPONENTIAL</code> backoff, this is the initial base delay.</td> - </tr> - <tr> - <td><h5>connection.path-prefix</h5></td> - <td>optional</td> - <td>yes</td> - <td style="word-wrap: break-word;">(none)</td> - <td>String</td> - <td>Prefix string to be added to every REST communication, e.g., <code>'/v1'</code>.</td> - </tr> - <tr> - <td><h5>format</h5></td> - <td>optional</td> - <td>no</td> - <td style="word-wrap: break-word;">json</td> - <td>String</td> - <td>Elasticsearch connector supports to specify a format. The format must produce a valid json document. - By default uses built-in <code>'json'</code> format. Please refer to <a href="{{< ref "docs/connectors/table/formats/overview" >}}">JSON Format</a> page for more details. - </td> - </tr> - </tbody> -</table> - -Features ----------------- - -### Key Handling - -The Elasticsearch sink can work in either upsert mode or append mode, depending on whether a primary key is defined. -If a primary key is defined, the Elasticsearch sink works in upsert mode which can consume queries containing UPDATE/DELETE messages. -If a primary key is not defined, the Elasticsearch sink works in append mode which can only consume queries containing INSERT only messages. - -In the Elasticsearch connector, the primary key is used to calculate the Elasticsearch document id, which is a string of up to 512 bytes. It cannot have whitespaces. -The Elasticsearch connector generates a document ID string for every row by concatenating all primary key fields in the order defined in the DDL using a key delimiter specified by `document-id.key-delimiter`. -Certain types are not allowed as a primary key field as they do not have a good string representation, e.g. `BYTES`, `ROW`, `ARRAY`, `MAP`, etc. -If no primary key is specified, Elasticsearch will generate a document id automatically. - -See [CREATE TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table) for more details about the PRIMARY KEY syntax. - -### Dynamic Index - -The Elasticsearch sink supports both static index and dynamic index. - -If you want to have a static index, the `index` option value should be a plain string, e.g. `'myusers'`, all the records will be consistently written into "myusers" index. - -If you want to have a dynamic index, you can use `{field_name}` to reference a field value in the record to dynamically generate a target index. -You can also use `'{field_name|date_format_string}'` to convert a field value of `TIMESTAMP/DATE/TIME` type into the format specified by the `date_format_string`. -The `date_format_string` is compatible with Java's [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/index.html). -For example, if the option value is `'myusers-{log_ts|yyyy-MM-dd}'`, then a record with `log_ts` field value `2020-03-27 12:25:55` will be written into "myusers-2020-03-27" index. - -You can also use `'{now()|date_format_string}'` to convert the current system time to the format specified by `date_format_string`. The corresponding time type of `now()` is `TIMESTAMP_WITH_LTZ`. -When formatting the system time as a string, the time zone configured in the session through `table.local-time-zone` will be used. You can use `NOW()`, `now()`, `CURRENT_TIMESTAMP`, `current_timestamp`. - -**NOTE:** When using the dynamic index generated by the current system time, for changelog stream, there is no guarantee that the records with the same primary key can generate the same index name. -Therefore, the dynamic index based on the system time can only support append only stream. - -Data Type Mapping ----------------- - -Elasticsearch stores document in a JSON string. So the data type mapping is between Flink data type and JSON data type. -Flink uses built-in `'json'` format for Elasticsearch connector. Please refer to [JSON Format]({{< ref "docs/connectors/table/formats/json" >}}) page for more type mapping details. - -{{< top >}}
