http://git-wip-us.apache.org/repos/asf/nifi/blob/75af3a2e/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java new file mode 100644 index 0000000..cb928fa --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.MockProcessorInitializationContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.get.GetAction; +import org.elasticsearch.action.get.GetRequestBuilder; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.support.AdapterActionFuture; +import org.elasticsearch.client.transport.NoNodeAvailableException; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.transport.ReceiveTimeoutTransportException; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.util.HashMap; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +public class TestFetchElasticsearch { + + private InputStream docExample; + private TestRunner runner; + + @Before + public void setUp() throws IOException { + ClassLoader classloader = Thread.currentThread().getContextClassLoader(); + docExample = classloader.getResourceAsStream("DocumentExample.json"); + + } + + @After + public void teardown() { + runner = null; + } + + @Test + public void testFetchElasticsearchOnTrigger() throws IOException { + runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(true)); // all docs are found + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + + runner.setProperty(FetchElasticsearch.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearch.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}"); + runner.assertValid(); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearch.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testFetchElasticsearchOnTriggerWithFailures() throws IOException { + runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(false)); // simulate doc not found + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(FetchElasticsearch.INDEX, "doc"); + runner.setProperty(FetchElasticsearch.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}"); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + // This test generates a "document not found" + runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_NOT_FOUND, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearch.REL_NOT_FOUND).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testFetchElasticsearchWithBadHosts() throws IOException { + runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(false)); // simulate doc not found + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "http://127.0.0.1:9300,127.0.0.2:9300"); + runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(FetchElasticsearch.INDEX, "doc"); + runner.setProperty(FetchElasticsearch.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}"); + + runner.assertNotValid(); + } + + @Test + public void testFetchElasticsearchOnTriggerWithExceptions() throws IOException { + FetchElasticsearchTestProcessor processor = new FetchElasticsearchTestProcessor(true); + runner = TestRunners.newTestRunner(processor); + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(FetchElasticsearch.INDEX, "doc"); + runner.setProperty(FetchElasticsearch.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}"); + + // No Node Available exception + processor.setExceptionToThrow(new NoNodeAvailableException("test")); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1); + runner.clearTransferState(); + + // Elasticsearch Timeout exception + processor.setExceptionToThrow(new ElasticsearchTimeoutException("test")); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652141"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1); + runner.clearTransferState(); + + // Receive Timeout Transport exception + processor.setExceptionToThrow(new ReceiveTimeoutTransportException(mock(StreamInput.class))); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652141"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1); + runner.clearTransferState(); + + // Node Closed exception + processor.setExceptionToThrow(new NodeClosedException(mock(StreamInput.class))); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652141"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1); + runner.clearTransferState(); + + // Elasticsearch Parse exception + processor.setExceptionToThrow(new ElasticsearchParseException("test")); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652141"); + }}); + runner.run(1, true, true); + + // This test generates an exception on execute(),routes to failure + runner.assertTransferCount(FetchElasticsearch.REL_FAILURE, 1); + } + + @Test(expected = ProcessException.class) + public void testCreateElasticsearchClientWithException() throws ProcessException { + FetchElasticsearchTestProcessor processor = new FetchElasticsearchTestProcessor(true) { + @Override + protected TransportClient getTransportClient(Settings.Builder settingsBuilder, String shieldUrl, + String username, String password) + throws MalformedURLException { + throw new MalformedURLException(); + } + }; + + MockProcessContext context = new MockProcessContext(processor); + processor.initialize(new MockProcessorInitializationContext(processor, context)); + processor.callCreateElasticsearchClient(context); + } + + @Test + public void testSetupSecureClient() throws Exception { + FetchElasticsearchTestProcessor processor = new FetchElasticsearchTestProcessor(true); + runner = TestRunners.newTestRunner(processor); + SSLContextService sslService = mock(SSLContextService.class); + when(sslService.getIdentifier()).thenReturn("ssl-context"); + runner.addControllerService("ssl-context", sslService); + runner.enableControllerService(sslService); + runner.setProperty(FetchElasticsearch.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(FetchElasticsearch.INDEX, "doc"); + runner.setProperty(FetchElasticsearch.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}"); + + // Allow time for the controller service to fully initialize + Thread.sleep(500); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + } + + /** + * A Test class that extends the processor in order to inject/mock behavior + */ + private static class FetchElasticsearchTestProcessor extends FetchElasticsearch { + boolean documentExists = true; + Exception exceptionToThrow = null; + + public FetchElasticsearchTestProcessor(boolean documentExists) { + this.documentExists = documentExists; + } + + public void setExceptionToThrow(Exception exceptionToThrow) { + this.exceptionToThrow = exceptionToThrow; + } + + @Override + protected TransportClient getTransportClient(Settings.Builder settingsBuilder, String shieldUrl, + String username, String password) + throws MalformedURLException { + TransportClient mockClient = mock(TransportClient.class); + GetRequestBuilder getRequestBuilder = spy(new GetRequestBuilder(mockClient, GetAction.INSTANCE)); + if (exceptionToThrow != null) { + doThrow(exceptionToThrow).when(getRequestBuilder).execute(); + } else { + doReturn(new MockGetRequestBuilderExecutor(documentExists)).when(getRequestBuilder).execute(); + } + when(mockClient.prepareGet(anyString(), anyString(), anyString())).thenReturn(getRequestBuilder); + + return mockClient; + } + + public void callCreateElasticsearchClient(ProcessContext context) { + createElasticsearchClient(context); + } + + private static class MockGetRequestBuilderExecutor + extends AdapterActionFuture<GetResponse, ActionListener<GetResponse>> + implements ListenableActionFuture<GetResponse> { + + boolean documentExists = true; + + public MockGetRequestBuilderExecutor(boolean documentExists) { + this.documentExists = documentExists; + } + + + @Override + protected GetResponse convert(ActionListener<GetResponse> bulkResponseActionListener) { + return null; + } + + @Override + public void addListener(ActionListener<GetResponse> actionListener) { + + } + + @Override + public GetResponse get() throws InterruptedException, ExecutionException { + GetResponse response = mock(GetResponse.class); + when(response.isExists()).thenReturn(documentExists); + when(response.getSourceAsBytes()).thenReturn("Success".getBytes()); + when(response.getSourceAsString()).thenReturn("Success"); + return response; + } + + @Override + public GetResponse actionGet() { + try { + return get(); + } catch (Exception e) { + fail(e.getMessage()); + } + return null; + } + } + } + + + ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Integration test section below + // + // The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution. + // However if you wish to execute them as part of a test phase, comment out the @Ignored line for each + // desired test. + ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Tests basic ES functionality against a local or test ES cluster + */ + @Test + @Ignore("Comment this out if you want to run against local or test ES") + public void testFetchElasticsearchBasic() { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearch()); + runner.setValidateExpressionUsage(true); + + //Local Cluster - Mac pulled from brew + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + + runner.setProperty(FetchElasticsearch.INDEX, "doc"); + + runner.setProperty(FetchElasticsearch.TYPE, "status"); + runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}"); + runner.assertValid(); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + + + runner.enqueue(docExample); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 1); + } + + @Test + @Ignore("Comment this out if you want to run against local or test ES") + public void testFetchElasticsearchBatch() throws IOException { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearch()); + runner.setValidateExpressionUsage(true); + + //Local Cluster - Mac pulled from brew + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(FetchElasticsearch.INDEX, "doc"); + + runner.setProperty(FetchElasticsearch.TYPE, "status"); + runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}"); + runner.assertValid(); + + + String message = convertStreamToString(docExample); + for (int i = 0; i < 100; i++) { + + long newId = 28039652140L + i; + final String newStrId = Long.toString(newId); + runner.enqueue(message.getBytes(), new HashMap<String, String>() {{ + put("doc_id", newStrId); + }}); + + } + + runner.run(); + + runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 100); + } + + /** + * Convert an input stream to a stream + * + * @param is input the input stream + * @return return the converted input stream as a string + */ + static String convertStreamToString(InputStream is) { + java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A"); + return s.hasNext() ? s.next() : ""; + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/75af3a2e/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java index 6af8fd2..dc1c445 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java @@ -16,22 +16,27 @@ */ package org.apache.nifi.processors.elasticsearch; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.AdapterActionFuture; import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.NoNodeAvailableException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.transport.ReceiveTimeoutTransportException; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -46,21 +51,20 @@ import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; public class TestPutElasticsearch { - private InputStream twitterExample; + private InputStream docExample; private TestRunner runner; @Before public void setUp() throws IOException { ClassLoader classloader = Thread.currentThread().getContextClassLoader(); - twitterExample = classloader - .getResourceAsStream("TweetExample.json"); - + docExample = classloader.getResourceAsStream("DocumentExample.json"); } @After @@ -70,105 +74,151 @@ public class TestPutElasticsearch { @Test public void testPutElasticSearchOnTrigger() throws IOException { - runner = TestRunners.newTestRunner(new ElasticsearchTestProcessor(false)); // no failures - runner.setValidateExpressionUsage(false); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); - runner.setProperty(PutElasticsearch.INDEX, "tweet"); + runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.assertNotValid(); runner.setProperty(PutElasticsearch.TYPE, "status"); runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); runner.assertNotValid(); - runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id"); + runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id"); runner.assertValid(); - runner.enqueue(twitterExample, new HashMap<String, String>() {{ - put("tweet_id", "28039652140"); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); }}); runner.run(1, true, true); runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0); assertNotNull(out); - out.assertAttributeEquals("tweet_id", "28039652140"); + out.assertAttributeEquals("doc_id", "28039652140"); } @Test public void testPutElasticSearchOnTriggerWithFailures() throws IOException { - runner = TestRunners.newTestRunner(new ElasticsearchTestProcessor(true)); // simulate failures + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures runner.setValidateExpressionUsage(false); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); - runner.setProperty(PutElasticsearch.INDEX, "tweet"); + runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.setProperty(PutElasticsearch.TYPE, "status"); runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); - runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id"); + runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id"); - runner.enqueue(twitterExample, new HashMap<String, String>() {{ - put("tweet_id", "28039652140"); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); }}); runner.run(1, true, true); runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_FAILURE, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0); assertNotNull(out); - out.assertAttributeEquals("tweet_id", "28039652140"); + out.assertAttributeEquals("doc_id", "28039652140"); } @Test - public void testPutElasticSearchOnTriggerNode() throws IOException { - runner = TestRunners.newTestRunner(new ElasticsearchTestProcessor(false)); // no failures - runner.setValidateExpressionUsage(false); - runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE,"node"); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); + public void testPutElasticsearchOnTriggerWithExceptions() throws IOException { + PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(false); + runner = TestRunners.newTestRunner(processor); + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); - - runner.setProperty(PutElasticsearch.INDEX, "tweet"); - runner.assertNotValid(); + runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.setProperty(PutElasticsearch.TYPE, "status"); - runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); - runner.assertNotValid(); - runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id"); - runner.assertValid(); + runner.setValidateExpressionUsage(true); + runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id"); - runner.enqueue(twitterExample, new HashMap<String, String>() {{ - put("tweet_id", "28039652141"); + // No Node Available exception + processor.setExceptionToThrow(new NoNodeAvailableException("test")); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); }}); runner.run(1, true, true); - runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0); - assertNotNull(out); - out.assertAttributeEquals("tweet_id", "28039652141"); + runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1); + runner.clearTransferState(); + + // Elasticsearch Timeout exception + processor.setExceptionToThrow(new ElasticsearchTimeoutException("test")); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652141"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1); + runner.clearTransferState(); + + // Receive Timeout Transport exception + processor.setExceptionToThrow(new ReceiveTimeoutTransportException(mock(StreamInput.class))); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652142"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1); + runner.clearTransferState(); + + // Node Closed exception + processor.setExceptionToThrow(new NodeClosedException(mock(StreamInput.class))); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652143"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1); + runner.clearTransferState(); + + // Elasticsearch Parse exception + processor.setExceptionToThrow(new ElasticsearchParseException("test")); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652144"); + }}); + runner.run(1, true, true); + + // This test generates an exception on execute(),routes to failure + runner.assertTransferCount(PutElasticsearch.REL_FAILURE, 1); } + /** * A Test class that extends the processor in order to inject/mock behavior */ - private static class ElasticsearchTestProcessor extends PutElasticsearch { + private static class PutElasticsearchTestProcessor extends PutElasticsearch { boolean responseHasFailures = false; + Exception exceptionToThrow = null; - public ElasticsearchTestProcessor(boolean responseHasFailures) { + public PutElasticsearchTestProcessor(boolean responseHasFailures) { this.responseHasFailures = responseHasFailures; } + public void setExceptionToThrow(Exception exceptionToThrow) { + this.exceptionToThrow = exceptionToThrow; + } + @Override - @OnScheduled - public void createClient(ProcessContext context) throws IOException { - esClient = mock(Client.class); - BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(esClient, BulkAction.INSTANCE)); - doReturn(new MockBulkRequestBuilderExecutor(responseHasFailures)).when(bulkRequestBuilder).execute(); - when(esClient.prepareBulk()).thenReturn(bulkRequestBuilder); - - IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(esClient, IndexAction.INSTANCE); - when(esClient.prepareIndex(anyString(), anyString(), anyString())).thenReturn(indexRequestBuilder); + public void createElasticsearchClient(ProcessContext context) throws ProcessException { + Client mockClient = mock(Client.class); + BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(mockClient, BulkAction.INSTANCE)); + if (exceptionToThrow != null) { + doThrow(exceptionToThrow).when(bulkRequestBuilder).execute(); + } else { + doReturn(new MockBulkRequestBuilderExecutor(responseHasFailures)).when(bulkRequestBuilder).execute(); + } + when(mockClient.prepareBulk()).thenReturn(bulkRequestBuilder); + + IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(mockClient, IndexAction.INSTANCE); + when(mockClient.prepareIndex(anyString(), anyString(), anyString())).thenReturn(indexRequestBuilder); + + esClient.set(mockClient); } private static class MockBulkRequestBuilderExecutor @@ -195,6 +245,10 @@ public class TestPutElasticsearch { public BulkResponse get() throws InterruptedException, ExecutionException { BulkResponse response = mock(BulkResponse.class); when(response.hasFailures()).thenReturn(responseHasFailures); + BulkItemResponse item = mock(BulkItemResponse.class); + when(item.getItemId()).thenReturn(1); + when(item.isFailed()).thenReturn(true); + when(response.getItems()).thenReturn(new BulkItemResponse[]{item}); return response; } @@ -212,72 +266,37 @@ public class TestPutElasticsearch { /** * Tests basic ES functionality against a local or test ES cluster - * @throws IOException */ @Test @Ignore("Comment this out if you want to run against local or test ES") - public void testPutElasticSearchBasic() throws IOException { + public void testPutElasticSearchBasic() { System.out.println("Starting test " + new Object() { }.getClass().getEnclosingMethod().getName()); final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch()); runner.setValidateExpressionUsage(false); //Local Cluster - Mac pulled from brew - runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, "transport"); runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); - runner.setProperty(PutElasticsearch.INDEX, "tweet"); + runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); runner.setProperty(PutElasticsearch.TYPE, "status"); - runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id"); + runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id"); runner.assertValid(); - runner.enqueue(twitterExample, new HashMap<String, String>() {{ - put("tweet_id", "28039652140"); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); }}); - runner.enqueue(twitterExample); + runner.enqueue(docExample); runner.run(1, true, true); runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0); - - } - - @Test - @Ignore("Comment this out if you want to run against local or test ES") - public void testPutElasticSearchBasicNode() throws IOException { - System.out.println("Starting test " + new Object() { - }.getClass().getEnclosingMethod().getName()); - final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch()); - runner.setValidateExpressionUsage(false); - - //Local Cluster - Mac pulled from brew - runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, "node"); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); - runner.setProperty(AbstractElasticsearchProcessor.PATH_HOME, "/usr/local/opt/elasticsearch"); - runner.setProperty(PutElasticsearch.INDEX, "tweet"); - runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); - - runner.setProperty(PutElasticsearch.TYPE, "status"); - runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id"); - runner.assertValid(); - - runner.enqueue(twitterExample, new HashMap<String, String>() {{ - put("tweet_id", "28039652141"); - }}); - - runner.enqueue(twitterExample); - runner.run(1, true, true); - - runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0); - } @Test @@ -289,31 +308,25 @@ public class TestPutElasticsearch { runner.setValidateExpressionUsage(false); //Local Cluster - Mac pulled from brew - runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, "transport"); runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); - runner.setProperty(PutElasticsearch.INDEX, "tweet"); + runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.setProperty(PutElasticsearch.BATCH_SIZE, "100"); runner.setProperty(PutElasticsearch.TYPE, "status"); - runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id"); + runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id"); runner.assertValid(); - JsonParser parser = new JsonParser(); - JsonObject json; - String message = convertStreamToString(twitterExample); + String message = convertStreamToString(docExample); for (int i = 0; i < 100; i++) { - json = parser.parse(message).getAsJsonObject(); - String id = json.get("id").getAsString(); - long newId = Long.parseLong(id) + i; + long newId = 28039652140L + i; final String newStrId = Long.toString(newId); - //json.addProperty("id", newId); runner.enqueue(message.getBytes(), new HashMap<String, String>() {{ - put("tweet_id", newStrId); + put("doc_id", newStrId); }}); } @@ -321,8 +334,6 @@ public class TestPutElasticsearch { runner.run(); runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 100); - final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0); - } /** http://git-wip-us.apache.org/repos/asf/nifi/blob/75af3a2e/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json new file mode 100644 index 0000000..014a66c --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "created_at": "Thu Jan 21 16:02:46 +0000 2016", + "text": "This is a test document from a mock social media service", + "contributors": null, + "id": 28039652140, + "shares": null, + "geographic_location": null, + "userinfo": { + "name": "Not A. Person", + "location": "Orlando, FL", + "created_at": "Fri Oct 24 23:22:09 +0000 2008", + "follow_count": 1, + "url": "http://not.a.real.site", + "id": 16958875, + "lang": "en", + "time_zone": "Mountain Time (US & Canada)", + "description": "I'm a test person.", + "following_count": 71, + "screen_name": "Nobody" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/75af3a2e/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json deleted file mode 100644 index 7375be6..0000000 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json +++ /dev/null @@ -1,83 +0,0 @@ - -{ - "coordinates": null, - "created_at": "Thu Oct 21 16:02:46 +0000 2010", - "favorited": false, - "truncated": false, - "id_str": "28039652140", - "entities": { - "urls": [ - { - "expanded_url": null, - "url": "http://gnip.com/success_stories", - "indices": [ - 69, - 100 - ] - } - ], - "hashtags": [ - - ], - "user_mentions": [ - { - "name": "Gnip, Inc.", - "id_str": "16958875", - "id": 16958875, - "indices": [ - 25, - 30 - ], - "screen_name": "gnip" - } - ] - }, - "in_reply_to_user_id_str": null, - "text": "what we've been up to at @gnip -- delivering data to happy customers http://gnip.com/success_stories", - "contributors": null, - "id": 28039652140, - "retweet_count": null, - "in_reply_to_status_id_str": null, - "geo": null, - "retweeted": false, - "in_reply_to_user_id": null, - "user": { - "profile_sidebar_border_color": "C0DEED", - "name": "Gnip, Inc.", - "profile_sidebar_fill_color": "DDEEF6", - "profile_background_tile": false, - "profile_image_url": "http://a3.twimg.com/profile_images/62803643/icon_normal.png", - "location": "Boulder, CO", - "created_at": "Fri Oct 24 23:22:09 +0000 2008", - "id_str": "16958875", - "follow_request_sent": false, - "profile_link_color": "0084B4", - "favourites_count": 1, - "url": "http://blog.gnip.com", - "contributors_enabled": false, - "utc_offset": -25200, - "id": 16958875, - "profile_use_background_image": true, - "listed_count": 23, - "protected": false, - "lang": "en", - "profile_text_color": "333333", - "followers_count": 260, - "time_zone": "Mountain Time (US & Canada)", - "verified": false, - "geo_enabled": true, - "profile_background_color": "C0DEED", - "notifications": false, - "description": "Gnip makes it really easy for you to collect social data for your business.", - "friends_count": 71, - "profile_background_image_url": "http://s.twimg.com/a/1287010001/images/themes/theme1/bg.png", - "statuses_count": 302, - "screen_name": "gnip", - "following": false, - "show_all_inline_media": false - }, - "in_reply_to_screen_name": null, - "source": "web", - "place": null, - "in_reply_to_status_id": null -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/75af3a2e/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml index 8255487..31039ae 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml @@ -1,4 +1,14 @@ <?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor +license agreements. See the NOTICE file distributed with this work for additional +information regarding copyright ownership. The ASF licenses this file to +You under the Apache License, Version 2.0 (the "License"); you may not use +this file except in compliance with the License. You may obtain a copy of +the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required +by applicable law or agreed to in writing, software distributed under the +License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS +OF ANY KIND, either express or implied. See the License for the specific +language governing permissions and limitations under the License. --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> @@ -23,4 +33,14 @@ <module>nifi-elasticsearch-processors</module> </modules> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-elasticsearch-processors</artifactId> + <version>0.4.2-SNAPSHOT</version> + </dependency> + </dependencies> + </dependencyManagement> + </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/75af3a2e/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 646ae1c..0cdc917 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -52,6 +52,7 @@ <module>nifi-riemann-bundle</module> <module>nifi-html-bundle</module> <module>nifi-scripting-bundle</module> + <module>nifi-elasticsearch-bundle</module> </modules> <dependencyManagement> <dependencies>
