Repository: nifi
Updated Branches:
  refs/heads/master e74c67f77 -> 4df3eb567


NIFI-4410: Improved error handling/logging in PutElasticsearchHttp processors

Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>

This closes #2175.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4df3eb56
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4df3eb56
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4df3eb56

Branch: refs/heads/master
Commit: 4df3eb567d8dff396b0e2380949e971d074dd04b
Parents: e74c67f
Author: Matthew Burgess <mattyb...@apache.org>
Authored: Mon Sep 25 22:25:29 2017 -0400
Committer: Pierre Villard <pierre.villard...@gmail.com>
Committed: Fri Feb 2 10:35:03 2018 +0100

----------------------------------------------------------------------
 .../elasticsearch/PutElasticsearchHttp.java     | 39 +++++++++++++-------
 .../PutElasticsearchHttpRecord.java             | 18 +++++++--
 .../elasticsearch/TestPutElasticsearchHttp.java | 32 +++++++++++++++-
 3 files changed, 71 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4df3eb56/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index 39836fd..1f9cb73 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -355,21 +355,34 @@ public class PutElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
                         ArrayNode itemNodeArray = (ArrayNode) 
responseJson.get("items");
                         if (itemNodeArray.size() > 0) {
                             // All items are returned whether they succeeded 
or failed, so iterate through the item array
-                            // at the same time as the flow file list, moving 
each to success or failure accordingly
+                            // at the same time as the flow file list, moving 
each to success or failure accordingly,
+                            // but only keep the first error for logging
+                            String errorReason = null;
                             for (int i = itemNodeArray.size() - 1; i >= 0; 
i--) {
                                 JsonNode itemNode = itemNodeArray.get(i);
-                                FlowFile flowFile = 
flowFilesToTransfer.remove(i);
-                                int status = 
itemNode.findPath("status").asInt();
-                                if (!isSuccess(status)) {
-                                    String reason = 
itemNode.findPath("//error/reason").asText();
-                                    logger.error("Failed to insert {} into 
Elasticsearch due to {}, transferring to failure",
-                                            new Object[]{flowFile, reason});
-                                    session.transfer(flowFile, REL_FAILURE);
-
-                                } else {
-                                    session.transfer(flowFile, REL_SUCCESS);
-                                    // Record provenance event
-                                    
session.getProvenanceReporter().send(flowFile, url.toString());
+                                if (flowFilesToTransfer.size() > i) {
+                                    FlowFile flowFile = 
flowFilesToTransfer.remove(i);
+                                    int status = 
itemNode.findPath("status").asInt();
+                                    if (!isSuccess(status)) {
+                                        if (errorReason == null) {
+                                            // Use "result" if it is present; 
this happens for status codes like 404 Not Found, which may not have an 
error/reason
+                                            String reason = 
itemNode.findPath("//result").asText();
+                                            if (StringUtils.isEmpty(reason)) {
+                                                // If there was no result, we 
expect an error with a string description in the "reason" field
+                                                reason = 
itemNode.findPath("//error/reason").asText();
+                                            }
+                                            errorReason = reason;
+                                            logger.error("Failed to process {} 
due to {}, transferring to failure",
+                                                    new Object[]{flowFile, 
errorReason});
+                                        }
+                                        flowFile = session.penalize(flowFile);
+                                        session.transfer(flowFile, 
REL_FAILURE);
+
+                                    } else {
+                                        session.transfer(flowFile, 
REL_SUCCESS);
+                                        // Record provenance event
+                                        
session.getProvenanceReporter().send(flowFile, url.toString());
+                                    }
                                 }
                             }
                         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4df3eb56/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index c1767f1..f65abb0 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -394,14 +394,24 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
                     ArrayNode itemNodeArray = (ArrayNode) 
responseJson.get("items");
                     if (itemNodeArray.size() > 0) {
                         // All items are returned whether they succeeded or 
failed, so iterate through the item array
-                        // at the same time as the flow file list, logging 
failures accordingly
+                        // at the same time as the flow file list, moving each 
to success or failure accordingly,
+                        // but only keep the first error for logging
+                        String errorReason = null;
                         for (int i = itemNodeArray.size() - 1; i >= 0; i--) {
                             JsonNode itemNode = itemNodeArray.get(i);
                             int status = itemNode.findPath("status").asInt();
                             if (!isSuccess(status)) {
-                                String reason = 
itemNode.findPath("//error/reason").asText();
-                                logger.error("Failed to insert {} into 
Elasticsearch due to {}, transferring to failure",
-                                        new Object[]{flowFile, reason});
+                                if (errorReason == null) {
+                                    // Use "result" if it is present; this 
happens for status codes like 404 Not Found, which may not have an error/reason
+                                    String reason = 
itemNode.findPath("//result").asText();
+                                    if (StringUtils.isEmpty(reason)) {
+                                        // If there was no result, we expect 
an error with a string description in the "reason" field
+                                        reason = 
itemNode.findPath("//error/reason").asText();
+                                    }
+                                    errorReason = reason;
+                                    logger.error("Failed to process {} due to 
{}, transferring to failure",
+                                            new Object[]{flowFile, 
errorReason});
+                                }
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4df3eb56/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
index 36aa94e..5ca3814 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
@@ -350,6 +350,27 @@ public class TestPutElasticsearchHttp {
         assertNotNull(out);
     }
 
+    @Test
+    public void testPutElasticSearchOnTriggerWithDocumentNotFound() throws 
IOException {
+        PutElasticsearchTestProcessor processor = new 
PutElasticsearchTestProcessor(true);
+        processor.setResultField("not_found");
+        runner = TestRunners.newTestRunner(processor); // simulate failures
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(PutElasticsearchHttp.INDEX_OP, "delete");
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttp.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 
1);
+        runner.clearTransferState();
+    }
+
     /**
      * A Test class that extends the processor in order to inject/mock behavior
      */
@@ -359,6 +380,7 @@ public class TestPutElasticsearchHttp {
         int statusCode = 200;
         String statusMessage = "OK";
         String expectedUrl = null;
+        String resultField = null;
 
         PutElasticsearchTestProcessor(boolean responseHasFailures) {
             this.responseHasFailures = responseHasFailures;
@@ -373,6 +395,10 @@ public class TestPutElasticsearchHttp {
             expectedUrl = url;
         }
 
+        public void setResultField(String resultField) {
+            this.resultField = resultField;
+        }
+
         @Override
         protected void createElasticsearchClient(ProcessContext context) 
throws ProcessException {
             client = mock(OkHttpClient.class);
@@ -391,7 +417,11 @@ public class TestPutElasticsearchHttp {
                         if (responseHasFailures) {
                             // This case is for a status code of 200 for the 
bulk response itself, but with an error (of 400) inside
                             
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
-                            
sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed 
to parse [gender]\",");
+                            if(resultField != null) {
+                                sb.append("\"result\":{\"not_found\",");
+                            } else {
+                                
sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed 
to parse [gender]\",");
+                            }
                             
sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected
 end-of-input in VALUE_STRING\\n at ");
                             sb.append("[Source: 
org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, 
column: 39]\"}}}},");
                         }

Reply via email to