Repository: nifi
Updated Branches:
  refs/heads/master 9e884f612 -> fa5fed9bb


NIFI-3082: Fixed status code handling in PutElasticsearchHttp

This closes #1258.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fa5fed9b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fa5fed9b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fa5fed9b

Branch: refs/heads/master
Commit: fa5fed9bb59eb485e48dd7350bf693a3039307ef
Parents: 9e884f6
Author: Matt Burgess <[email protected]>
Authored: Tue Nov 22 12:53:45 2016 -0500
Committer: Pierre Villard <[email protected]>
Committed: Tue Nov 22 21:32:03 2016 +0100

----------------------------------------------------------------------
 .../processors/elasticsearch/PutElasticsearchHttp.java  | 12 +++++++++---
 .../elasticsearch/TestPutElasticsearchHttp.java         | 11 ++++++++++-
 2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/fa5fed9b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index 92b1452..7117100 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -358,9 +358,15 @@ public class PutElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
                     session.transfer(flowFilesToTransfer, REL_FAILURE);
                     context.yield();
                 }
-            } else {
-                // Something went wrong during the bulk update, throw a 
ProcessException to indicate rollback
-                throw new ProcessException("Received error code " + statusCode 
+ " from Elasticsearch API");
+            } else if (statusCode / 100 == 5) {
+                // 5xx -> RETRY, but a server error might last a while, so 
yield
+                logger.warn("Elasticsearch returned code {} with message {}, 
transferring flow file to retry. This is likely a server problem, yielding...",
+                        new Object[]{statusCode, getResponse.message()});
+                session.transfer(flowFilesToTransfer, REL_RETRY);
+                context.yield();
+            } else {  // 1xx, 3xx, 4xx, etc. -> NO RETRY
+                logger.warn("Elasticsearch returned code {} with message {}, 
transferring flow file to failure", new Object[]{statusCode, 
getResponse.message()});
+                session.transfer(flowFilesToTransfer, REL_FAILURE);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fa5fed9b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
index c3d5a34..1172004 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
@@ -167,7 +167,7 @@ public class TestPutElasticsearchHttp {
         runner.assertNotValid();
     }
 
-    @Test(expected = AssertionError.class)
+    @Test
     public void testPutElasticSearchOnTriggerWithFailures() throws IOException 
{
         PutElasticsearchTestProcessor processor = new 
PutElasticsearchTestProcessor(true);
         processor.setStatus(100, "Should fail");
@@ -183,6 +183,15 @@ public class TestPutElasticsearchHttp {
             put("doc_id", "28039652140");
         }});
         runner.run(1, true, true);
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 
1);
+        runner.clearTransferState();
+
+        processor.setStatus(500, "Should retry");
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_RETRY, 
1);
     }
 
     @Test

Reply via email to