Repository: nifi
Updated Branches:
  refs/heads/master 4babd067c -> bbbc77707


NIFI-1666: Fixed bug with EL evaluation in PutElasticsearch processor


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bbbc7770
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bbbc7770
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bbbc7770

Branch: refs/heads/master
Commit: bbbc77707f324670de63730e39804c2196178b16
Parents: 4babd06
Author: Matt Burgess <mattyb...@apache.org>
Authored: Tue Mar 22 14:00:25 2016 -0400
Committer: Matt Burgess <mattyb...@apache.org>
Committed: Tue Mar 22 15:10:27 2016 -0400

----------------------------------------------------------------------
 .../elasticsearch/PutElasticsearch.java         |  5 +-
 .../elasticsearch/TestPutElasticsearch.java     | 60 +++++++++++++++++++-
 2 files changed, 60 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/bbbc7770/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
index 6319791..a0c986c 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
@@ -146,9 +146,7 @@ public class PutElasticsearch extends 
AbstractElasticsearchProcessor {
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
-        final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions().getValue();
         final String id_attribute = 
context.getProperty(ID_ATTRIBUTE).getValue();
-        final String docType = 
context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
         final Charset charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
 
         final List<FlowFile> flowFiles = session.get(batchSize);
@@ -166,6 +164,9 @@ public class PutElasticsearch extends 
AbstractElasticsearchProcessor {
             }
 
             for (FlowFile file : flowFiles) {
+                final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue();
+                final String docType = 
context.getProperty(TYPE).evaluateAttributeExpressions(file).getValue();
+
                 final String id = file.getAttribute(id_attribute);
                 if (id == null) {
                     logger.error("No value in identifier attribute {} for {}, 
transferring to failure", new Object[]{id_attribute, file});

http://git-wip-us.apache.org/repos/asf/nifi/blob/bbbc7770/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
index 957332a..fa2767b 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
@@ -41,6 +41,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 
 import java.io.IOException;
@@ -209,6 +211,43 @@ public class TestPutElasticsearch {
         assertNotNull(out);
     }
 
+    @Test
+    public void testPutElasticsearchOnTriggerWithIndexFromAttribute() throws 
IOException {
+        runner = TestRunners.newTestRunner(new 
PutElasticsearchTestProcessor(false));
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
+        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
+        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
+        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        runner.setProperty(PutElasticsearch.INDEX, "${i}");
+        runner.setProperty(PutElasticsearch.TYPE, "${type}");
+        runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652144");
+            put("i", "doc");
+            put("type", "status");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        runner.clearTransferState();
+
+        // Now try an empty attribute value, should fail
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652144");
+            put("type", "status");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_RETRY, 1);
+        final MockFlowFile out2 = 
runner.getFlowFilesForRelationship(PutElasticsearch.REL_RETRY).get(0);
+        assertNotNull(out2);
+    }
+
     /**
      * A Test class that extends the processor in order to inject/mock behavior
      */
@@ -226,7 +265,7 @@ public class TestPutElasticsearch {
 
         @Override
         public void createElasticsearchClient(ProcessContext context) throws 
ProcessException {
-            Client mockClient = mock(Client.class);
+            final Client mockClient = mock(Client.class);
             BulkRequestBuilder bulkRequestBuilder = spy(new 
BulkRequestBuilder(mockClient, BulkAction.INSTANCE));
             if (exceptionToThrow != null) {
                 doThrow(exceptionToThrow).when(bulkRequestBuilder).execute();
@@ -235,8 +274,23 @@ public class TestPutElasticsearch {
             }
             when(mockClient.prepareBulk()).thenReturn(bulkRequestBuilder);
 
-            IndexRequestBuilder indexRequestBuilder = new 
IndexRequestBuilder(mockClient, IndexAction.INSTANCE);
-            when(mockClient.prepareIndex(anyString(), anyString(), 
anyString())).thenReturn(indexRequestBuilder);
+            when(mockClient.prepareIndex(anyString(), anyString(), 
anyString())).thenAnswer(new Answer<IndexRequestBuilder>() {
+                @Override
+                public IndexRequestBuilder answer(InvocationOnMock 
invocationOnMock) throws Throwable {
+                    Object[] args = invocationOnMock.getArguments();
+                    String arg1 = (String) args[0];
+                    if (arg1.isEmpty()) {
+                        throw new NoNodeAvailableException("Needs index");
+                    }
+                    String arg2 = (String) args[1];
+                    if (arg2.isEmpty()) {
+                        throw new NoNodeAvailableException("Needs doc type");
+                    } else {
+                        IndexRequestBuilder indexRequestBuilder = new 
IndexRequestBuilder(mockClient, IndexAction.INSTANCE);
+                        return indexRequestBuilder;
+                    }
+                }
+            });
 
             esClient.set(mockClient);
         }

Reply via email to