This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6571df5  [FLINK-18359] Log failures in handler instead of 
ElasticsearchSinkBase
6571df5 is described below

commit 6571df54669b22cf0211d170de030d01a2221d60
Author: rinkako <[email protected]>
AuthorDate: Thu Jun 18 19:56:17 2020 +0800

    [FLINK-18359] Log failures in handler instead of ElasticsearchSinkBase
    
    This allows more control for the handler whether or not to log error
    messages. In some cases, users know that they will get a lot of
    failures, for example when back-filling existing data in ES. For those,
    you don't want your log flooded with ERROR messages.
---
 .../streaming/connectors/elasticsearch/ElasticsearchSinkBase.java | 8 --------
 .../connectors/elasticsearch/util/NoOpFailureHandler.java         | 5 +++++
 .../elasticsearch/util/RetryRejectedExecutionFailureHandler.java  | 5 +++++
 3 files changed, 10 insertions(+), 8 deletions(-)

diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 2811008..d19fba6 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -39,8 +39,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.rest.RestStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -71,8 +69,6 @@ public abstract class ElasticsearchSinkBase<T, C extends 
AutoCloseable> extends
 
        private static final long serialVersionUID = -1007596293618451942L;
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
-
        // 
------------------------------------------------------------------------
        //  Internal bulk processor configuration
        // 
------------------------------------------------------------------------
@@ -408,8 +404,6 @@ public abstract class ElasticsearchSinkBase<T, C extends 
AutoCloseable> extends
                                                itemResponse = 
response.getItems()[i];
                                                failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
                                                if (failure != null) {
-                                                       LOG.error("Failed 
Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
-
                                                        restStatus = 
itemResponse.getFailure().getStatus();
                                                        actionRequest = 
request.requests().get(i);
                                                        if (restStatus == null) 
{
@@ -441,8 +435,6 @@ public abstract class ElasticsearchSinkBase<T, C extends 
AutoCloseable> extends
 
                @Override
                public void afterBulk(long executionId, BulkRequest request, 
Throwable failure) {
-                       LOG.error("Failed Elasticsearch bulk request: {}", 
failure.getMessage(), failure);
-
                        try {
                                for (DocWriteRequest writeRequest : 
request.requests()) {
                                        if (writeRequest instanceof 
ActionRequest) {
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
index 4726dc1..c076fc8 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
@@ -22,6 +22,8 @@ import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureH
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 
 import org.elasticsearch.action.ActionRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An {@link ActionRequestFailureHandler} that simply fails the sink on any 
failures.
@@ -31,8 +33,11 @@ public class NoOpFailureHandler implements 
ActionRequestFailureHandler {
 
        private static final long serialVersionUID = 737941343410827885L;
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(NoOpFailureHandler.class);
+
        @Override
        public void onFailure(ActionRequest action, Throwable failure, int 
restStatusCode, RequestIndexer indexer) throws Throwable {
+               LOG.error("Failed Elasticsearch item request: {}", 
failure.getMessage(), failure);
                // simply fail the sink
                throw failure;
        }
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
index ca710cb..98b58f9 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
@@ -25,6 +25,8 @@ import org.apache.flink.util.ExceptionUtils;
 
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An {@link ActionRequestFailureHandler} that re-adds requests that failed 
due to temporary
@@ -36,8 +38,11 @@ public class RetryRejectedExecutionFailureHandler implements 
ActionRequestFailur
 
        private static final long serialVersionUID = -7423562912824511906L;
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(RetryRejectedExecutionFailureHandler.class);
+
        @Override
        public void onFailure(ActionRequest action, Throwable failure, int 
restStatusCode, RequestIndexer indexer) throws Throwable {
+               LOG.error("Failed Elasticsearch item request: {}", 
failure.getMessage(), failure);
                if (ExceptionUtils.findThrowable(failure, 
EsRejectedExecutionException.class).isPresent()) {
                        indexer.add(action);
                } else {

Reply via email to