[
https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16612365#comment-16612365
]
ASF GitHub Bot commented on FLINK-3875:
---------------------------------------
dawidwys commented on a change in pull request #6611: [FLINK-3875] [connectors]
Add an upsert table sink factory for Elasticsearch
URL: https://github.com/apache/flink/pull/6611#discussion_r217056983
##########
File path: docs/dev/table/connect.md
##########
@@ -583,6 +584,104 @@ Make sure to add the version-specific Kafka dependency.
In addition, a correspon
{% top %}
+### Elasticsearch Connector
+
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+<span class="label label-info">Format: JSON-only</span>
+
+The Elasticsearch connector allows for writing into an index of the
Elasticsearch search engine.
+
+The connector operates in [upsert mode](#update-modes) and exchanges
UPSERT/DELETE messages with the external system using a [key defined by the
query](streaming.html#table-to-stream-conversion). It can be defined as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
+{% highlight java %}
+.connect(
+ new Elasticsearch()
+ .version("6") // required: valid connector versions
are "6"
+ .host("localhost", 9200, "http") // required: one or more Elasticsearch
hosts to connect to
+ .index("MyUsers") // required: Elasticsearch index
+ .documentType("user") // required: Elasticsearch document type
+
+ .keyDelimiter("$") // optional: delimiter for composite keys ("_"
by default)
+ // e.g., "$" would result in IDs
"KEY1$KEY2$KEY3"
+ .keyNullLiteral("n/a") // optional: representation for null fields in
keys ("null" by default)
+
+ // optional: failure handling strategy in case a request to Elasticsearch
fails (fail by default)
+ .failureHandlerFail() // optional: throws an exception if a
request fails and causes a job failure
+ .failureHandlerIgnore() // or ignores failures and drops the
request
+ .failureHandlerRetryRejected() // or re-adds requests that have failed
due to queue capacity saturation
+ .failureHandlerCustom(...) // or custom failure handling with a
ActionRequestFailureHandler subclass
+
+ // optional: configure how to buffer elements before sending them in bulk
to the cluster for efficiency
+ .disableFlushOnCheckpoint() // optional: disables flushing on
checkpoint (see notes below!)
+ .bulkFlushMaxActions(42) // optional: maximum number of actions to
buffer for each bulk request
+ .bulkFlushMaxSize(42) // optional: maximum size of buffered
actions (in MB) per bulk request
+ .bulkFlushInterval(60000L) // optional: bulk flush interval (in
milliseconds)
+
+ .bulkFlushBackoffConstant() // optional: use a constant backoff type
+ .bulkFlushBackoffExponential() // or use an exponential backoff type
+ .bulkFlushBackoffMaxRetries(3) // optional: maximum number of retries
+ .bulkFlushBackoffDelay(30000L) // optional: delay between each backoff
attempt (in milliseconds)
+
+ // optional: connection properties to be used during REST communication to
Elasticsearch
+ .connectionMaxRetryTimeout(3) // optional: maximum timeout (in
milliseconds) between retries
+ .connectionPathPrefix("/v1") // optional: prefix string to be added to
every REST communication
+)
+{% endhighlight %}
+</div>
+
+<div data-lang="YAML" markdown="1">
+{% highlight yaml %}
+connector:
+ type: kafka
+ version: 6 # required: valid connector versions are "6"
+ hosts: # required: one or more Elasticsearch hosts to
connect to
+ - hostname: "localhost"
+ port: 9200
+ schema: "http"
+ index: "MyUsers" # required: Elasticsearch index
+ document-type: "user" # required: Elasticsearch document type
+
+ key-delimiter: "$" # optional: delimiter for composite keys ("_" by
default)
+ # e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
+ key-null-literal: "n/a" # optional: representation for null fields in keys
("null" by default)
+
+ # optional: failure handling strategy in case a request to Elasticsearch
fails ("fail" by default)
+ failure-handler: ... # valid strategies are "fail" (throws an exception
if a request fails and
+ # thus causes a job failure), "ignore" (ignores
failures and drops the request),
+ # "retry-rejected" (re-adds requests that have
failed due to queue capacity
+ # saturation), or "custom" for failure handling
with a
+ # ActionRequestFailureHandler subclass
+
+ # optional: configure how to buffer elements before sending them in bulk
to the cluster for efficiency
+ flush-on-checkpoint: true # optional: disables flushing on checkpoint
(see notes below!) ("true" by default)
+ bulk-flush:
+ max-actions: 42 # optional: maximum number of actions to
buffer for each bulk request
+ max-size: 42 # optional: maximum size of buffered actions
(in MB) per bulk request
+ interval: 60000 # optional: bulk flush interval (in
milliseconds)
+ back-off: # optional: backoff strategy ("disabled" by
default)
+ type: ... # valid strategis are "disabled",
"constant", or "exponential"
+ max-retries: 3 # optional: maximum number of retries
+ delay: 30000 # optional: delay between each backoff attempt
(in milliseconds)
+
+ # optional: connection properties to be used during REST communication to
Elasticsearch
+ connection-max-retry-timeout: 3 # optional: maximum timeout (in
milliseconds) between retries
+ connection-path-prefix: "/v1" # optional: prefix string to be added to
every REST communication
+{% endhighlight %}
+</div>
+</div>
+
+**Bulk flushing:** For more information about characteristics of the optional
flushing parameters see the [corresponding low-level documentation]({{
site.baseurl }}/dev/connectors/elasticsearch.html).
+
+**Disabling flushing on checkpoint:** When disabled, a sink will not wait for
all pending action requests to be acknowledged by Elasticsearch on checkpoints.
Thus, a sink does NOT provide any strong guarantees for at-least-once delivery
of action requests.
+
+**Key extraction:** Flink automatically extracts valid keys from a query. For
example, a query `SELECT a, b, c FROM t GROUP BY a, b` defines a composite key
of the fields `a` and `b`. The Elasticsearch connector generates a document ID
string for every row by concatenating all key fields in the order defined in
the query using a key delimiter. A custom representation of null literals for
key fields can be defined.
+
+A JSON format defines how to encode documents for the external system,
therefore, it must be added as a dependency.
Review comment:
How about link to json format in the dependencies table?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add a TableSink for Elasticsearch
> ---------------------------------
>
> Key: FLINK-3875
> URL: https://issues.apache.org/jira/browse/FLINK-3875
> Project: Flink
> Issue Type: New Feature
> Components: Streaming Connectors, Table API & SQL
> Reporter: Fabian Hueske
> Assignee: Timo Walther
> Priority: Major
> Labels: pull-request-available
>
> Add a TableSink that writes data to Elasticsearch
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)