This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6571df5 [FLINK-18359] Log failures in handler instead of
ElasticsearchSinkBase
6571df5 is described below
commit 6571df54669b22cf0211d170de030d01a2221d60
Author: rinkako <[email protected]>
AuthorDate: Thu Jun 18 19:56:17 2020 +0800
[FLINK-18359] Log failures in handler instead of ElasticsearchSinkBase
This allows more control for the handler whether or not to log error
messages. In some cases, users know that they will get a lot of
failures, for example when back-filling existing data in ES. For those,
you don't want your log flooded with ERROR messages.
---
.../streaming/connectors/elasticsearch/ElasticsearchSinkBase.java | 8 --------
.../connectors/elasticsearch/util/NoOpFailureHandler.java | 5 +++++
.../elasticsearch/util/RetryRejectedExecutionFailureHandler.java | 5 +++++
3 files changed, 10 insertions(+), 8 deletions(-)
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 2811008..d19fba6 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -39,8 +39,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.HashMap;
@@ -71,8 +69,6 @@ public abstract class ElasticsearchSinkBase<T, C extends
AutoCloseable> extends
private static final long serialVersionUID = -1007596293618451942L;
- private static final Logger LOG =
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
-
//
------------------------------------------------------------------------
// Internal bulk processor configuration
//
------------------------------------------------------------------------
@@ -408,8 +404,6 @@ public abstract class ElasticsearchSinkBase<T, C extends
AutoCloseable> extends
itemResponse =
response.getItems()[i];
failure =
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
- LOG.error("Failed
Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
-
restStatus =
itemResponse.getFailure().getStatus();
actionRequest =
request.requests().get(i);
if (restStatus == null)
{
@@ -441,8 +435,6 @@ public abstract class ElasticsearchSinkBase<T, C extends
AutoCloseable> extends
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
- LOG.error("Failed Elasticsearch bulk request: {}",
failure.getMessage(), failure);
-
try {
for (DocWriteRequest writeRequest :
request.requests()) {
if (writeRequest instanceof
ActionRequest) {
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
index 4726dc1..c076fc8 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
@@ -22,6 +22,8 @@ import
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureH
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.ActionRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An {@link ActionRequestFailureHandler} that simply fails the sink on any
failures.
@@ -31,8 +33,11 @@ public class NoOpFailureHandler implements
ActionRequestFailureHandler {
private static final long serialVersionUID = 737941343410827885L;
+ private static final Logger LOG =
LoggerFactory.getLogger(NoOpFailureHandler.class);
+
@Override
public void onFailure(ActionRequest action, Throwable failure, int
restStatusCode, RequestIndexer indexer) throws Throwable {
+ LOG.error("Failed Elasticsearch item request: {}",
failure.getMessage(), failure);
// simply fail the sink
throw failure;
}
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
index ca710cb..98b58f9 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
@@ -25,6 +25,8 @@ import org.apache.flink.util.ExceptionUtils;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An {@link ActionRequestFailureHandler} that re-adds requests that failed
due to temporary
@@ -36,8 +38,11 @@ public class RetryRejectedExecutionFailureHandler implements
ActionRequestFailur
private static final long serialVersionUID = -7423562912824511906L;
+ private static final Logger LOG =
LoggerFactory.getLogger(RetryRejectedExecutionFailureHandler.class);
+
@Override
public void onFailure(ActionRequest action, Throwable failure, int
restStatusCode, RequestIndexer indexer) throws Throwable {
+ LOG.error("Failed Elasticsearch item request: {}",
failure.getMessage(), failure);
if (ExceptionUtils.findThrowable(failure,
EsRejectedExecutionException.class).isPresent()) {
indexer.add(action);
} else {