[FLINK-5353] [elasticsearch] User-provided failure handler for ElasticsearchSink
This commit fixes both FLINK-5353 and FLINK-5122. It allows users to implement a failure handler to control how failed action requests are dealt with. The commit also includes general improvements to FLINK-5122: 1. Use the built-in backoff functionality in the Elasticsearch BulkProcessor (not available for Elasticsearch 1.x) 2. Migrate the `checkErrorAndRetryBulk` functionality to the new failure handler Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3743e898 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3743e898 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3743e898 Branch: refs/heads/master Commit: 3743e898104d79a9813d444d38fa9f86617bb5ef Parents: aaac7c2 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Mon Jan 30 13:55:26 2017 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Fri Feb 24 22:58:40 2017 +0800 ---------------------------------------------------------------------- docs/dev/connectors/elasticsearch.md | 121 +++++++++++-- .../ActionRequestFailureHandler.java | 72 ++++++++ .../ElasticsearchApiCallBridge.java | 12 ++ .../elasticsearch/ElasticsearchSinkBase.java | 174 ++++++++++++------- .../util/NoOpActionRequestFailureHandler.java | 37 ++++ .../Elasticsearch1ApiCallBridge.java | 10 ++ .../elasticsearch/ElasticsearchSink.java | 37 +++- .../Elasticsearch2ApiCallBridge.java | 31 ++++ .../elasticsearch2/ElasticsearchSink.java | 29 +++- .../Elasticsearch5ApiCallBridge.java | 31 ++++ .../elasticsearch5/ElasticsearchSink.java | 31 +++- 11 files changed, 499 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/docs/dev/connectors/elasticsearch.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md index 4388b9a..2ca1f9b 100644 --- a/docs/dev/connectors/elasticsearch.md +++ b/docs/dev/connectors/elasticsearch.md @@ -23,6 +23,9 @@ specific language governing permissions and limitations under the License. --> +* This will be replaced by the TOC +{:toc} + This connector provides sinks that can request document actions to an [Elasticsearch](https://elastic.co/) Index. To use this connector, add one of the following dependencies to your project, depending on the version @@ -59,14 +62,14 @@ Note that the streaming connectors are currently not part of the binary distribution. See [here]({{site.baseurl}}/dev/linking.html) for information about how to package the program with the libraries for cluster execution. -#### Installing Elasticsearch +## Installing Elasticsearch Instructions for setting up an Elasticsearch cluster can be found [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). Make sure to set and remember a cluster name. This must be set when creating an `ElasticsearchSink` for requesting document actions against your cluster. -#### Elasticsearch Sink +## Elasticsearch Sink The `ElasticsearchSink` uses a `TransportClient` to communicate with an Elasticsearch cluster. @@ -200,15 +203,13 @@ request for each incoming element. Generally, the `ElasticsearchSinkFunction` can be used to perform multiple requests of different types (ex., `DeleteRequest`, `UpdateRequest`, etc.). -Internally, the sink uses a `BulkProcessor` to send action requests to the cluster. -This will buffer elements before sending them in bulk to the cluster. The behaviour of the -`BulkProcessor` can be set using these config keys in the provided `Map` configuration: - * **bulk.flush.max.actions**: Maximum amount of elements to buffer - * **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer - * **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two - settings in milliseconds +Internally, each parallel instance of the Flink Elasticsearch Sink uses +a `BulkProcessor` to send action requests to the cluster. +This will buffer elements before sending them in bulk to the cluster. The `BulkProcessor` +executes bulk requests one at a time, i.e. there will be no two concurrent +flushes of the buffered actions in progress. -#### Communication using Embedded Node (only for Elasticsearch 1.x) +### Communication using Embedded Node (only for Elasticsearch 1.x) For Elasticsearch versions 1.x, communication using an embedded node is also supported. See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html) @@ -272,14 +273,106 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String The difference is that now we do not need to provide a list of addresses of Elasticsearch nodes. -Optionally, the sink can try to re-execute the bulk request when the error -message matches certain patterns indicating a timeout or a overloaded cluster. -This behaviour is disabled by default and can be enabled by setting `checkErrorAndRetryBulk(true)`. +### Handling Failing Elasticsearch Requests + +Elasticsearch action requests may fail due to a variety of reasons, including +temporarily saturated node queue capacity or malformed documents to be indexed. +The Flink Elasticsearch Sink allows the user to specify how request +failures are handled, by simply implementing an `ActionRequestFailureHandler` and +providing it to the constructor. + +Below is an example: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataStream<String> input = ...; + +input.addSink(new ElasticsearchSink<>( + config, transportAddresses, + new ElasticsearchSinkFunction<String>() {...}, + new ActionRequestFailureHandler() { + @Override + boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { + // this example uses Apache Commons to search for nested exceptions + + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action); + return false; + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) { + // malformed document; simply drop request without failing sink + return false; + } else { + // for all other failures, fail the sink + return true; + } + } +})); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val input: DataStream[String] = ... + +input.addSink(new ElasticsearchSink( + config, transportAddresses, + new ElasticsearchSinkFunction[String] {...}, + new ActionRequestFailureHandler { + override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { + // this example uses Apache Commons to search for nested exceptions + + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action) + return false + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) { + // malformed document; simply drop request without failing sink + return false + } else { + // for all other failures, fail the sink + return true + } + } +})) +{% endhighlight %} +</div> +</div> +The above example will let the sink re-add requests that failed due to +queue capacity saturation and drop requests with malformed documents, without +failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler` +is not provided to the constructor, the sink will fail for any kind of error. + +Note that `onFailure` is called for failures that still occur only after the +`BulkProcessor` internally finishes all backoff retry attempts. +By default, the `BulkProcessor` retries to a maximum of 8 attempts with +an exponential backoff. For more information on the behaviour of the +internal `BulkProcessor` and how to configure it, please see the following section. + +### Configuring the Internal Bulk Processor + +The internal `BulkProcessor` can be further configured for its behaviour +on how buffered action requests are flushed, by setting the following values in +the provided `Map<String, String>`: + + * **bulk.flush.max.actions**: Maximum amount of actions to buffer before flushing. + * **bulk.flush.max.size.mb**: Maximum size of data (in megabytes) to buffer before flushing. + * **bulk.flush.interval.ms**: Interval at which to flush regardless of the amount or size of buffered actions. + +For versions 2.x and above, configuring how temporary request errors are +retried is also supported: + + * **bulk.flush.backoff.enable**: Whether or not to perform retries with backoff delay for a flush + if one or more of its actions failed due to a temporary `EsRejectedExecutionException`. + * **bulk.flush.backoff.type**: The type of backoff delay, either `CONSTANT` or `EXPONENTIAL` + * **bulk.flush.backoff.delay**: The amount of delay for backoff. For constant backoff, this + is simply the delay between each retry. For exponential backoff, this is the initial base delay. + * **bulk.flush.backoff.retries**: The amount of backoff retries to attempt. More information about Elasticsearch can be found [here](https://elastic.co). -#### Packaging the Elasticsearch Connector into an Uber-Jar +## Packaging the Elasticsearch Connector into an Uber-Jar For the execution of your Flink program, it is recommended to build a so-called uber-jar (executable jar) containing all your dependencies http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java new file mode 100644 index 0000000..45d04fc --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.elasticsearch.action.ActionRequest; + +import java.io.Serializable; + +/** + * An implementation of {@link ActionRequestFailureHandler} is provided by the user to define how failed + * {@link ActionRequest ActionRequests} should be handled, ex. dropping them, reprocessing malformed documents, or + * simply requesting them to be sent to Elasticsearch again if the failure is only temporary. + * + * <p> + * Example: + * + * <pre>{@code + * + * private static class ExampleActionRequestFailureHandler implements ActionRequestFailureHandler { + * + * @Override + * boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { + * // this example uses Apache Commons to search for nested exceptions + * + * if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + * // full queue; re-add document for indexing + * indexer.add(action); + * return false; + * } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) { + * // malformed document; simply drop request without failing sink + * return false; + * } else { + * // for all other failures, fail the sink + * return true; + * } + * } + * } + * + * }</pre> + * + * <p> + * The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests + * with malformed documents, without failing the sink. For all other failures, the sink will fail. + */ +public interface ActionRequestFailureHandler extends Serializable { + + /** + * Handle a failed {@link ActionRequest}. + * + * @param action the {@link ActionRequest} that failed due to the failure + * @param failure the cause of failure + * @param indexer request indexer to re-add the failed action, if intended to do so + * @return the implementation should return {@code true} if the sink should fail due to this failure, and {@code false} otherwise + */ + boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java index 6298a85..b482432 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.elasticsearch; import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.client.Client; import javax.annotation.Nullable; @@ -53,6 +54,17 @@ public interface ElasticsearchApiCallBridge extends Serializable { @Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse); /** + * Set backoff-related configurations on the provided {@link BulkProcessor.Builder}. + * The builder will be later on used to instantiate the actual {@link BulkProcessor}. + * + * @param builder the {@link BulkProcessor.Builder} to configure. + * @param flushBackoffPolicy user-provided backoff retry settings ({@code null} if the user disabled backoff retries). + */ + void configureBulkProcessorBackoff( + BulkProcessor.Builder builder, + @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy); + + /** * Perform any necessary state cleanup. */ void cleanup(); http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java index 7977fc0..2c29865 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java @@ -21,24 +21,23 @@ import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.util.InstantiationUtil; -import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -70,10 +69,56 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay"; + + public enum FlushBackoffType { + CONSTANT, + EXPONENTIAL + } + + public class BulkFlushBackoffPolicy implements Serializable { + + private static final long serialVersionUID = -6022851996101826049L; + + // the default values follow the Elasticsearch default settings for BulkProcessor + private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL; + private int maxRetryCount = 8; + private long delayMillis = 50; + + public FlushBackoffType getBackoffType() { + return backoffType; + } + + public int getMaxRetryCount() { + return maxRetryCount; + } + + public long getDelayMillis() { + return delayMillis; + } + + public void setBackoffType(FlushBackoffType backoffType) { + this.backoffType = checkNotNull(backoffType); + } + + public void setMaxRetryCount(int maxRetryCount) { + checkArgument(maxRetryCount > 0); + this.maxRetryCount = maxRetryCount; + } + + public void setDelayMillis(long delayMillis) { + checkArgument(delayMillis > 0); + this.delayMillis = delayMillis; + } + } private final Integer bulkProcessorFlushMaxActions; private final Integer bulkProcessorFlushMaxSizeMb; private final Integer bulkProcessorFlushIntervalMillis; + private final BulkFlushBackoffPolicy bulkProcessorFlushBackoffPolicy; // ------------------------------------------------------------------------ // User-facing API and configuration @@ -85,16 +130,12 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { /** The function that is used to construct mulitple {@link ActionRequest ActionRequests} from each incoming element. */ private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction; + /** User-provided handler for failed {@link ActionRequest ActionRequests}. */ + private final ActionRequestFailureHandler failureHandler; + /** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */ private transient BulkProcessorIndexer requestIndexer; - /** - * When set to <code>true</code> and the bulk action fails, the error message will be checked for - * common patterns like <i>timeout</i>, <i>UnavailableShardsException</i> or a full buffer queue on the node. - * When a matching pattern is found, the bulk will be retried. - */ - protected boolean checkErrorAndRetryBulk = false; - // ------------------------------------------------------------------------ // Internals for the Flink Elasticsearch Sink // ------------------------------------------------------------------------ @@ -109,21 +150,28 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { private transient BulkProcessor bulkProcessor; /** - * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown in callbacks. + * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown in callbacks and + * the user considered it should fail the sink via the + * {@link ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, RequestIndexer)} method. + * + * Errors will be checked and rethrown before processing each input element, and when the sink is closed. */ private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>(); public ElasticsearchSinkBase( ElasticsearchApiCallBridge callBridge, Map<String, String> userConfig, - ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { + ElasticsearchSinkFunction<T> elasticsearchSinkFunction, + ActionRequestFailureHandler failureHandler) { this.callBridge = checkNotNull(callBridge); this.elasticsearchSinkFunction = checkNotNull(elasticsearchSinkFunction); + this.failureHandler = checkNotNull(failureHandler); - // we eagerly check if the user-provided sink function is serializable; - // otherwise, if it isn't serializable, users will merely get a non-informative error message + // we eagerly check if the user-provided sink function and failure handler is serializable; + // otherwise, if they aren't serializable, users will merely get a non-informative error message // "ElasticsearchSinkBase is not serializable" + try { InstantiationUtil.serializeObject(elasticsearchSinkFunction); } catch (Exception e) { @@ -132,10 +180,19 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { "The object probably contains or references non serializable fields."); } - checkNotNull(userConfig); + try { + InstantiationUtil.serializeObject(failureHandler); + } catch (Exception e) { + throw new IllegalArgumentException( + "The implementation of the provided ActionRequestFailureHandler is not serializable. " + + "The object probably contains or references non serializable fields."); + } // extract and remove bulk processor related configuration from the user-provided config, // so that the resulting user config only contains configuration related to the Elasticsearch client. + + checkNotNull(userConfig); + ParameterTool params = ParameterTool.fromMap(userConfig); if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { @@ -159,6 +216,31 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { bulkProcessorFlushIntervalMillis = null; } + boolean bulkProcessorFlushBackoffEnable = params.getBoolean(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, true); + userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE); + + if (bulkProcessorFlushBackoffEnable) { + this.bulkProcessorFlushBackoffPolicy = new BulkFlushBackoffPolicy(); + + if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)) { + bulkProcessorFlushBackoffPolicy.setBackoffType(FlushBackoffType.valueOf(params.get(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE))); + userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE); + } + + if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES)) { + bulkProcessorFlushBackoffPolicy.setMaxRetryCount(params.getInt(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES)); + userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES); + } + + if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY)) { + bulkProcessorFlushBackoffPolicy.setDelayMillis(params.getLong(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY)); + userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY); + } + + } else { + bulkProcessorFlushBackoffPolicy = null; + } + this.userConfig = userConfig; } @@ -175,48 +257,30 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { - boolean allRequestsRepeatable = true; + BulkItemResponse itemResponse; + Throwable failure; - for (BulkItemResponse itemResp : response.getItems()) { - Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp); + for (int i = 0; i < response.getItems().length; i++) { + itemResponse = response.getItems()[i]; + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); if (failure != null) { - String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); - - // Check if index request can be retried - if (checkErrorAndRetryBulk && ( - failureMessageLowercase.contains("timeout") || - failureMessageLowercase.contains("timed out") || // Generic timeout errors - failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) || // Shard not available due to rebalancing or node down - (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")))) // Bulk index queue on node full - { - LOG.debug("Retry bulk: {}", itemResp.getFailureMessage()); - } else { - // Cannot retry action - allRequestsRepeatable = false; - LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); + if (failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) { failureThrowable.compareAndSet(null, failure); } } } - - if (allRequestsRepeatable) { - reAddBulkRequest(request); - } } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - if (checkErrorAndRetryBulk && ( - failure instanceof ClusterBlockException // Examples: "no master" - || failure instanceof ElasticsearchTimeoutException) // ElasticsearchTimeoutException sounded good, not seen in stress tests yet - ) - { - LOG.debug("Retry bulk on throwable: {}", failure.getMessage()); - reAddBulkRequest(request); - } else { - LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure); - failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause()); + + // whole bulk request failures are usually just temporary timeouts on + // the Elasticsearch side; simply retry all action requests in the bulk + for (ActionRequest action : request.requests()) { + requestIndexer.add(action); } } } @@ -237,6 +301,9 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(bulkProcessorFlushIntervalMillis)); } + // if backoff retrying is disabled, bulkProcessorFlushBackoffPolicy will be null + callBridge.configureBulkProcessorBackoff(bulkProcessorBuilder, bulkProcessorFlushBackoffPolicy); + bulkProcessor = bulkProcessorBuilder.build(); requestIndexer = new BulkProcessorIndexer(bulkProcessor); } @@ -267,21 +334,6 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { checkErrorAndRethrow(); } - /** - * Adds all requests of the bulk to the BulkProcessor. Used when trying again. - * @param bulkRequest - */ - public void reAddBulkRequest(BulkRequest bulkRequest) { - //TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs. - - for (IndicesRequest req : bulkRequest.subRequests()) { - if (req instanceof ActionRequest) { - // There is no waiting time between index requests, so this may produce additional pressure on cluster - bulkProcessor.add((ActionRequest<?>) req); - } - } - } - private void checkErrorAndRethrow() { Throwable cause = failureThrowable.get(); if (cause != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java new file mode 100644 index 0000000..09173a2 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.util; + +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.elasticsearch.action.ActionRequest; + +/** + * An {@link ActionRequestFailureHandler} that simply fails the sink on any failures. + */ +public class NoOpActionRequestFailureHandler implements ActionRequestFailureHandler { + + private static final long serialVersionUID = 737941343410827885L; + + @Override + public boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { + // simply fail the sink + return true; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java index 098afa9..8a59da9 100644 --- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.elasticsearch; import org.apache.flink.util.Preconditions; import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; @@ -27,6 +28,7 @@ import org.elasticsearch.node.Node; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.util.List; import java.util.Map; @@ -119,6 +121,14 @@ public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge { } @Override + public void configureBulkProcessorBackoff( + BulkProcessor.Builder builder, + @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) { + // Elasticsearch 1.x does not support backoff retries for failed bulk requests + LOG.warn("Elasticsearch 1.x does not support backoff retries."); + } + + @Override public void cleanup() { if (node != null && !node.isClosed()) { node.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java index c338860..375d739 100644 --- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java +++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.elasticsearch; +import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.index.IndexRequest; @@ -105,7 +106,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> { * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element */ public ElasticsearchSink(Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { - super(new Elasticsearch1ApiCallBridge(), userConfig, elasticsearchSinkFunction); + this(userConfig, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler()); } /** @@ -116,6 +117,38 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> { * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element */ public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { - super(new Elasticsearch1ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction); + this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler()); + } + + /** + * Creates a new {@code ElasticsearchSink} that connects to the cluster using an embedded {@link Node}. + * + * @param userConfig The map of user settings that are used when constructing the embedded {@link Node} and {@link BulkProcessor} + * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element + * @param failureHandler This is used to handle failed {@link ActionRequest} + */ + public ElasticsearchSink( + Map<String, String> userConfig, + ElasticsearchSinkFunction<T> elasticsearchSinkFunction, + ActionRequestFailureHandler failureHandler) { + + super(new Elasticsearch1ApiCallBridge(), userConfig, elasticsearchSinkFunction, failureHandler); + } + + /** + * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}. + * + * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor} + * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient} + * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element + * @param failureHandler This is used to handle failed {@link ActionRequest} + */ + public ElasticsearchSink( + Map<String, String> userConfig, + List<TransportAddress> transportAddresses, + ElasticsearchSinkFunction<T> elasticsearchSinkFunction, + ActionRequestFailureHandler failureHandler) { + + super(new Elasticsearch1ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction, failureHandler); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java index 9407d9f..e85daf5 100644 --- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java @@ -18,16 +18,21 @@ package org.apache.flink.streaming.connectors.elasticsearch2; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils; import org.apache.flink.util.Preconditions; +import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.net.InetSocketAddress; import java.util.List; import java.util.Map; @@ -84,6 +89,32 @@ public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge { } @Override + public void configureBulkProcessorBackoff( + BulkProcessor.Builder builder, + @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) { + + BackoffPolicy backoffPolicy; + if (flushBackoffPolicy != null) { + switch (flushBackoffPolicy.getBackoffType()) { + case CONSTANT: + backoffPolicy = BackoffPolicy.constantBackoff( + new TimeValue(flushBackoffPolicy.getDelayMillis()), + flushBackoffPolicy.getMaxRetryCount()); + break; + case EXPONENTIAL: + default: + backoffPolicy = BackoffPolicy.exponentialBackoff( + new TimeValue(flushBackoffPolicy.getDelayMillis()), + flushBackoffPolicy.getMaxRetryCount()); + } + } else { + backoffPolicy = BackoffPolicy.noBackoff(); + } + + builder.setBackoffPolicy(backoffPolicy); + } + + @Override public void cleanup() { // nothing to cleanup } http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java index a0abc51..2210f63 100644 --- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java +++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java @@ -16,7 +16,9 @@ */ package org.apache.flink.streaming.connectors.elasticsearch2; +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.client.transport.TransportClient; @@ -82,9 +84,28 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> { * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient} * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element */ - public ElasticsearchSink(Map<String, String> userConfig, - List<InetSocketAddress> transportAddresses, - org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { - super(new Elasticsearch2ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction); + public ElasticsearchSink( + Map<String, String> userConfig, + List<InetSocketAddress> transportAddresses, + org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { + + this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler()); + } + + /** + * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}. + * + * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor} + * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient} + * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element + * @param failureHandler This is used to handle failed {@link ActionRequest} + */ + public ElasticsearchSink( + Map<String, String> userConfig, + List<InetSocketAddress> transportAddresses, + org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction, + ActionRequestFailureHandler failureHandler) { + + super(new Elasticsearch2ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction, failureHandler); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java index 1389e7d..c7d81f5 100644 --- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java @@ -18,19 +18,24 @@ package org.apache.flink.streaming.connectors.elasticsearch5; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils; import org.apache.flink.util.Preconditions; +import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.transport.Netty3Plugin; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.net.InetSocketAddress; import java.util.List; import java.util.Map; @@ -90,6 +95,32 @@ public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge { } @Override + public void configureBulkProcessorBackoff( + BulkProcessor.Builder builder, + @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) { + + BackoffPolicy backoffPolicy; + if (flushBackoffPolicy != null) { + switch (flushBackoffPolicy.getBackoffType()) { + case CONSTANT: + backoffPolicy = BackoffPolicy.constantBackoff( + new TimeValue(flushBackoffPolicy.getDelayMillis()), + flushBackoffPolicy.getMaxRetryCount()); + break; + case EXPONENTIAL: + default: + backoffPolicy = BackoffPolicy.exponentialBackoff( + new TimeValue(flushBackoffPolicy.getDelayMillis()), + flushBackoffPolicy.getMaxRetryCount()); + } + } else { + backoffPolicy = BackoffPolicy.noBackoff(); + } + + builder.setBackoffPolicy(backoffPolicy); + } + + @Override public void cleanup() { // nothing to cleanup } http://git-wip-us.apache.org/repos/asf/flink/blob/3743e898/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java index 9107d4e..175b4fa 100644 --- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java +++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java @@ -16,8 +16,10 @@ */ package org.apache.flink.streaming.connectors.elasticsearch5; +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.client.transport.TransportClient; @@ -64,13 +66,32 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> { /** * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}. * - * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} + * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor} * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient} * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element */ - public ElasticsearchSink(Map<String, String> userConfig, - List<InetSocketAddress> transportAddresses, - ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { - super(new Elasticsearch5ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction); + public ElasticsearchSink( + Map<String, String> userConfig, + List<InetSocketAddress> transportAddresses, + ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { + + this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler()); + } + + /** + * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}. + * + * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor} + * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient} + * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element + * @param failureHandler This is used to handle failed {@link ActionRequest} + */ + public ElasticsearchSink( + Map<String, String> userConfig, + List<InetSocketAddress> transportAddresses, + ElasticsearchSinkFunction<T> elasticsearchSinkFunction, + ActionRequestFailureHandler failureHandler) { + + super(new Elasticsearch5ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction, failureHandler); } }
