Repository: nifi Updated Branches: refs/heads/master 066accc27 -> 7fc7494b2
NIFI-3087: Fixed issue with partial failure responses in PutElasticsearch(Http) Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d3dbac50 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d3dbac50 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d3dbac50 Branch: refs/heads/master Commit: d3dbac50a8f354503838e5a0bdf22872d878b078 Parents: 066accc Author: Matt Burgess <[email protected]> Authored: Tue Nov 22 20:08:17 2016 -0500 Committer: joewitt <[email protected]> Committed: Wed Nov 23 08:11:49 2016 -0500 ---------------------------------------------------------------------- .../elasticsearch/PutElasticsearch.java | 24 ++++++++++++-------- .../elasticsearch/PutElasticsearchHttp.java | 2 +- 2 files changed, 15 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d3dbac50/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java index f64180b..216efd4 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java @@ -211,17 +211,21 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces final BulkResponse response = bulk.execute().actionGet(); if (response.hasFailures()) { - for (final BulkItemResponse item : response.getItems()) { - final FlowFile flowFile = flowFilesToTransfer.get(item.getItemId()); - if (item.isFailed()) { - logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure", - new Object[]{flowFile, item.getFailure().getMessage()}); - session.transfer(flowFile, REL_FAILURE); - - } else { - session.transfer(flowFile, REL_SUCCESS); + // Responses are guaranteed to be in order, remove them in reverse order + BulkItemResponse[] responses = response.getItems(); + if (responses != null && responses.length > 0) { + for (int i = responses.length - 1; i >= 0; i--) { + final FlowFile flowFile = flowFilesToTransfer.get(i); + if (responses[i].isFailed()) { + logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure", + new Object[]{flowFile, responses[i].getFailure().getMessage()}); + session.transfer(flowFile, REL_FAILURE); + + } else { + session.transfer(flowFile, REL_SUCCESS); + } + flowFilesToTransfer.remove(flowFile); } - flowFilesToTransfer.remove(flowFile); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d3dbac50/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java index 7117100..3ba46bb 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java @@ -328,7 +328,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { if (itemNodeArray.size() > 0) { // All items are returned whether they succeeded or failed, so iterate through the item array // at the same time as the flow file list, moving each to success or failure accordingly - for (int i = 0; i < itemNodeArray.size(); i++) { + for (int i = itemNodeArray.size() - 1; i >= 0; i--) { JsonNode itemNode = itemNodeArray.get(i); FlowFile flowFile = flowFilesToTransfer.remove(i); int status = itemNode.findPath("status").asInt();
