Repository: nifi
Updated Branches:
  refs/heads/master f6705f234 -> e1cf37fb8


NIFI-1619: Fix Elasticsearch processor bug when flow file missing ID attribute

This closes #269

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e1cf37fb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e1cf37fb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e1cf37fb

Branch: refs/heads/master
Commit: e1cf37fb8953cc04a8f92382b558cf89c1563a5d
Parents: f6705f2
Author: Matt Burgess <[email protected]>
Authored: Fri Mar 11 11:32:51 2016 -0500
Committer: Aldrin Piri <[email protected]>
Committed: Fri Mar 11 12:01:28 2016 -0500

----------------------------------------------------------------------
 .../elasticsearch/PutElasticsearch.java         | 42 +++++++++++---------
 .../elasticsearch/TestPutElasticsearch.java     | 20 ++++++++++
 2 files changed, 44 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e1cf37fb/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
index 244c432..6319791 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
@@ -49,6 +49,7 @@ import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
@@ -156,46 +157,51 @@ public class PutElasticsearch extends 
AbstractElasticsearchProcessor {
         }
 
         final ProcessorLog logger = getLogger();
-
+        // Keep track of the list of flow files that need to be transferred. 
As they are transferred, remove them from the list.
+        List<FlowFile> flowFilesToTransfer = new LinkedList<>(flowFiles);
         try {
             final BulkRequestBuilder bulk = esClient.get().prepareBulk();
             if (authToken != null) {
                 bulk.putHeader("Authorization", authToken);
             }
+
             for (FlowFile file : flowFiles) {
                 final String id = file.getAttribute(id_attribute);
                 if (id == null) {
-                    logger.error("No value in identifier attribute {} for {}", 
new Object[]{id_attribute, file});
+                    logger.error("No value in identifier attribute {} for {}, 
transferring to failure", new Object[]{id_attribute, file});
+                    flowFilesToTransfer.remove(file);
                     session.transfer(file, REL_FAILURE);
+                } else {
+                    session.read(file, new InputStreamCallback() {
+                        @Override
+                        public void process(final InputStream in) throws 
IOException {
+                            String json = IOUtils.toString(in, charset)
+                                    .replace("\r\n", " ").replace('\n', ' 
').replace('\r', ' ');
+                            bulk.add(esClient.get().prepareIndex(index, 
docType, id)
+                                    .setSource(json.getBytes(charset)));
+                        }
+                    });
                 }
-                session.read(file, new InputStreamCallback() {
-                    @Override
-                    public void process(final InputStream in) throws 
IOException {
-                        String json = IOUtils.toString(in, charset)
-                                .replace("\r\n", " ").replace('\n', ' 
').replace('\r', ' ');
-                        bulk.add(esClient.get().prepareIndex(index, docType, 
id)
-                                .setSource(json.getBytes(charset)));
-                    }
-                });
             }
 
             final BulkResponse response = bulk.execute().actionGet();
             if (response.hasFailures()) {
                 for (final BulkItemResponse item : response.getItems()) {
-                    final FlowFile flowFile = flowFiles.get(item.getItemId());
+                    final FlowFile flowFile = 
flowFilesToTransfer.get(item.getItemId());
                     if (item.isFailed()) {
-                        logger.error("Failed to insert {} into Elasticsearch 
due to {}",
+                        logger.error("Failed to insert {} into Elasticsearch 
due to {}, transferring to failure",
                                 new Object[]{flowFile, 
item.getFailure().getMessage()});
                         session.transfer(flowFile, REL_FAILURE);
 
                     } else {
                         session.transfer(flowFile, REL_SUCCESS);
                     }
+                    flowFilesToTransfer.remove(flowFile);
                 }
-            } else {
-                session.transfer(flowFiles, REL_SUCCESS);
             }
 
+            // Transfer any remaining flowfiles to success
+            session.transfer(flowFilesToTransfer, REL_SUCCESS);
 
         } catch (NoNodeAvailableException
                 | ElasticsearchTimeoutException
@@ -209,14 +215,14 @@ public class PutElasticsearch extends 
AbstractElasticsearchProcessor {
             logger.error("Failed to insert into Elasticsearch due to {}. More 
detailed information may be available in " +
                             "the NiFi logs.",
                     new Object[]{exceptionToRetry.getLocalizedMessage()}, 
exceptionToRetry);
-            session.transfer(flowFiles, REL_RETRY);
+            session.transfer(flowFilesToTransfer, REL_RETRY);
             context.yield();
 
         } catch (Exception exceptionToFail) {
-            logger.error("Failed to insert into Elasticsearch due to {}",
+            logger.error("Failed to insert into Elasticsearch due to {}, 
transferring to failure",
                     new Object[]{exceptionToFail.getLocalizedMessage()}, 
exceptionToFail);
 
-            session.transfer(flowFiles, REL_FAILURE);
+            session.transfer(flowFilesToTransfer, REL_FAILURE);
             context.yield();
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1cf37fb/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
index dc1c445..957332a 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
@@ -188,6 +188,26 @@ public class TestPutElasticsearch {
         runner.assertTransferCount(PutElasticsearch.REL_FAILURE, 1);
     }
 
+    @Test
+    public void testPutElasticsearchOnTriggerWithNoIdAttribute() throws 
IOException {
+        runner = TestRunners.newTestRunner(new 
PutElasticsearchTestProcessor(true)); // simulate failures
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
+        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
+        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
+        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        runner.setProperty(PutElasticsearch.INDEX, "doc");
+        runner.setProperty(PutElasticsearch.TYPE, "status");
+        runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
+
+        runner.enqueue(docExample);
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_FAILURE, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0);
+        assertNotNull(out);
+    }
 
     /**
      * A Test class that extends the processor in order to inject/mock behavior

Reply via email to