Repository: nifi
Updated Branches:
  refs/heads/master 066accc27 -> 7fc7494b2


NIFI-3087: Fixed issue with partial failure responses in PutElasticsearch(Http)


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d3dbac50
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d3dbac50
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d3dbac50

Branch: refs/heads/master
Commit: d3dbac50a8f354503838e5a0bdf22872d878b078
Parents: 066accc
Author: Matt Burgess <[email protected]>
Authored: Tue Nov 22 20:08:17 2016 -0500
Committer: joewitt <[email protected]>
Committed: Wed Nov 23 08:11:49 2016 -0500

----------------------------------------------------------------------
 .../elasticsearch/PutElasticsearch.java         | 24 ++++++++++++--------
 .../elasticsearch/PutElasticsearchHttp.java     |  2 +-
 2 files changed, 15 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d3dbac50/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
index f64180b..216efd4 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
@@ -211,17 +211,21 @@ public class PutElasticsearch extends 
AbstractElasticsearchTransportClientProces
 
             final BulkResponse response = bulk.execute().actionGet();
             if (response.hasFailures()) {
-                for (final BulkItemResponse item : response.getItems()) {
-                    final FlowFile flowFile = 
flowFilesToTransfer.get(item.getItemId());
-                    if (item.isFailed()) {
-                        logger.error("Failed to insert {} into Elasticsearch 
due to {}, transferring to failure",
-                                new Object[]{flowFile, 
item.getFailure().getMessage()});
-                        session.transfer(flowFile, REL_FAILURE);
-
-                    } else {
-                        session.transfer(flowFile, REL_SUCCESS);
+                // Responses are guaranteed to be in order, remove them in 
reverse order
+                BulkItemResponse[] responses = response.getItems();
+                if (responses != null && responses.length > 0) {
+                    for (int i = responses.length - 1; i >= 0; i--) {
+                        final FlowFile flowFile = flowFilesToTransfer.get(i);
+                        if (responses[i].isFailed()) {
+                            logger.error("Failed to insert {} into 
Elasticsearch due to {}, transferring to failure",
+                                    new Object[]{flowFile, 
responses[i].getFailure().getMessage()});
+                            session.transfer(flowFile, REL_FAILURE);
+
+                        } else {
+                            session.transfer(flowFile, REL_SUCCESS);
+                        }
+                        flowFilesToTransfer.remove(flowFile);
                     }
-                    flowFilesToTransfer.remove(flowFile);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d3dbac50/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index 7117100..3ba46bb 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -328,7 +328,7 @@ public class PutElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
                         if (itemNodeArray.size() > 0) {
                             // All items are returned whether they succeeded 
or failed, so iterate through the item array
                             // at the same time as the flow file list, moving 
each to success or failure accordingly
-                            for (int i = 0; i < itemNodeArray.size(); i++) {
+                            for (int i = itemNodeArray.size() - 1; i >= 0; 
i--) {
                                 JsonNode itemNode = itemNodeArray.get(i);
                                 FlowFile flowFile = 
flowFilesToTransfer.remove(i);
                                 int status = 
itemNode.findPath("status").asInt();

Reply via email to