Repository: nifi Updated Branches: refs/heads/master 4babd067c -> bbbc77707
NIFI-1666: Fixed bug with EL evaluation in PutElasticsearch processor Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bbbc7770 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bbbc7770 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bbbc7770 Branch: refs/heads/master Commit: bbbc77707f324670de63730e39804c2196178b16 Parents: 4babd06 Author: Matt Burgess <mattyb...@apache.org> Authored: Tue Mar 22 14:00:25 2016 -0400 Committer: Matt Burgess <mattyb...@apache.org> Committed: Tue Mar 22 15:10:27 2016 -0400 ---------------------------------------------------------------------- .../elasticsearch/PutElasticsearch.java | 5 +- .../elasticsearch/TestPutElasticsearch.java | 60 +++++++++++++++++++- 2 files changed, 60 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/bbbc7770/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java index 6319791..a0c986c 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java @@ -146,9 +146,7 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); - final String index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue(); final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue(); - final String docType = context.getProperty(TYPE).evaluateAttributeExpressions().getValue(); final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); final List<FlowFile> flowFiles = session.get(batchSize); @@ -166,6 +164,9 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor { } for (FlowFile file : flowFiles) { + final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue(); + final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(file).getValue(); + final String id = file.getAttribute(id_attribute); if (id == null) { logger.error("No value in identifier attribute {} for {}, transferring to failure", new Object[]{id_attribute, file}); http://git-wip-us.apache.org/repos/asf/nifi/blob/bbbc7770/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java index 957332a..fa2767b 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java @@ -41,6 +41,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.io.IOException; @@ -209,6 +211,43 @@ public class TestPutElasticsearch { assertNotNull(out); } + @Test + public void testPutElasticsearchOnTriggerWithIndexFromAttribute() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); + runner.setValidateExpressionUsage(false); + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(PutElasticsearch.INDEX, "${i}"); + runner.setProperty(PutElasticsearch.TYPE, "${type}"); + runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id"); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652144"); + put("i", "doc"); + put("type", "status"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0); + assertNotNull(out); + runner.clearTransferState(); + + // Now try an empty attribute value, should fail + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652144"); + put("type", "status"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_RETRY, 1); + final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearch.REL_RETRY).get(0); + assertNotNull(out2); + } + /** * A Test class that extends the processor in order to inject/mock behavior */ @@ -226,7 +265,7 @@ public class TestPutElasticsearch { @Override public void createElasticsearchClient(ProcessContext context) throws ProcessException { - Client mockClient = mock(Client.class); + final Client mockClient = mock(Client.class); BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(mockClient, BulkAction.INSTANCE)); if (exceptionToThrow != null) { doThrow(exceptionToThrow).when(bulkRequestBuilder).execute(); @@ -235,8 +274,23 @@ public class TestPutElasticsearch { } when(mockClient.prepareBulk()).thenReturn(bulkRequestBuilder); - IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(mockClient, IndexAction.INSTANCE); - when(mockClient.prepareIndex(anyString(), anyString(), anyString())).thenReturn(indexRequestBuilder); + when(mockClient.prepareIndex(anyString(), anyString(), anyString())).thenAnswer(new Answer<IndexRequestBuilder>() { + @Override + public IndexRequestBuilder answer(InvocationOnMock invocationOnMock) throws Throwable { + Object[] args = invocationOnMock.getArguments(); + String arg1 = (String) args[0]; + if (arg1.isEmpty()) { + throw new NoNodeAvailableException("Needs index"); + } + String arg2 = (String) args[1]; + if (arg2.isEmpty()) { + throw new NoNodeAvailableException("Needs doc type"); + } else { + IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(mockClient, IndexAction.INSTANCE); + return indexRequestBuilder; + } + } + }); esClient.set(mockClient); }