Repository: nifi Updated Branches: refs/heads/master 9d273b1e2 -> 181386b94
http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java index cb928fa..9b68f2e 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java @@ -79,10 +79,10 @@ public class TestFetchElasticsearch { public void testFetchElasticsearchOnTrigger() throws IOException { runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(true)); // all docs are found runner.setValidateExpressionUsage(true); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(FetchElasticsearch.INDEX, "doc"); runner.assertNotValid(); @@ -105,10 +105,10 @@ public class TestFetchElasticsearch { @Test public void testFetchElasticsearchOnTriggerWithFailures() throws IOException { runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(false)); // simulate doc not found - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(FetchElasticsearch.INDEX, "doc"); runner.setProperty(FetchElasticsearch.TYPE, "status"); runner.setValidateExpressionUsage(true); @@ -129,10 +129,10 @@ public class TestFetchElasticsearch { @Test public void testFetchElasticsearchWithBadHosts() throws IOException { runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(false)); // simulate doc not found - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "http://127.0.0.1:9300,127.0.0.2:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "http://127.0.0.1:9300,127.0.0.2:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(FetchElasticsearch.INDEX, "doc"); runner.setProperty(FetchElasticsearch.TYPE, "status"); runner.setValidateExpressionUsage(true); @@ -145,10 +145,10 @@ public class TestFetchElasticsearch { public void testFetchElasticsearchOnTriggerWithExceptions() throws IOException { FetchElasticsearchTestProcessor processor = new FetchElasticsearchTestProcessor(true); runner = TestRunners.newTestRunner(processor); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(FetchElasticsearch.INDEX, "doc"); runner.setProperty(FetchElasticsearch.TYPE, "status"); runner.setValidateExpressionUsage(true); @@ -230,10 +230,10 @@ public class TestFetchElasticsearch { runner.addControllerService("ssl-context", sslService); runner.enableControllerService(sslService); runner.setProperty(FetchElasticsearch.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(FetchElasticsearch.INDEX, "doc"); runner.setProperty(FetchElasticsearch.TYPE, "status"); runner.setValidateExpressionUsage(true); @@ -347,10 +347,10 @@ public class TestFetchElasticsearch { runner.setValidateExpressionUsage(true); //Local Cluster - Mac pulled from brew - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(FetchElasticsearch.INDEX, "doc"); @@ -378,10 +378,10 @@ public class TestFetchElasticsearch { runner.setValidateExpressionUsage(true); //Local Cluster - Mac pulled from brew - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(FetchElasticsearch.INDEX, "doc"); runner.setProperty(FetchElasticsearch.TYPE, "status"); http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java new file mode 100644 index 0000000..82fa3da --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import okhttp3.Call; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestFetchElasticsearchHttp { + + private InputStream docExample; + private TestRunner runner; + + @Before + public void setUp() throws IOException { + ClassLoader classloader = Thread.currentThread().getContextClassLoader(); + docExample = classloader.getResourceAsStream("DocumentExample.json"); + } + + @After + public void teardown() { + runner = null; + } + + @Test + public void testFetchElasticsearchOnTrigger() throws IOException { + runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(true)); // all docs are found + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + runner.assertValid(); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testFetchElasticsearchOnTriggerWithFields() throws IOException { + runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(true)); // all docs are found + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + runner.assertValid(); + runner.setProperty(FetchElasticsearchHttp.FIELDS, "id,, userinfo.location"); + runner.assertValid(); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testFetchElasticsearchOnTriggerWithDocNotFound() throws IOException { + runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(false)); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setValidateExpressionUsage(true); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + + runner.setIncomingConnection(true); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + // This test generates a "document not found" + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_NOT_FOUND, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_NOT_FOUND).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testFetchElasticsearchOnTriggerWithServerErrorRetry() throws IOException { + FetchElasticsearchHttpTestProcessor processor = new FetchElasticsearchHttpTestProcessor(false); + processor.setStatus(500, "Server error"); + runner = TestRunners.newTestRunner(processor); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + // This test generates a HTTP 500 "Server error" + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_RETRY, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_RETRY).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testFetchElasticsearchOnTriggerWithServerFail() throws IOException { + FetchElasticsearchHttpTestProcessor processor = new FetchElasticsearchHttpTestProcessor(false); + processor.setStatus(100, "Should fail"); + runner = TestRunners.newTestRunner(processor); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + // This test generates a HTTP 100 + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_FAILURE).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testFetchElasticsearchOnTriggerWithServerFailNoIncomingFlowFile() throws IOException { + FetchElasticsearchHttpTestProcessor processor = new FetchElasticsearchHttpTestProcessor(false); + processor.setStatus(100, "Should fail"); + runner = TestRunners.newTestRunner(processor); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + + runner.setIncomingConnection(false); + runner.run(1, true, true); + + // This test generates a HTTP 100 with no incoming flow file, so nothing should be transferred + processor.getRelationships().forEach(relationship -> runner.assertTransferCount(relationship, 0)); + runner.assertTransferCount(FetchElasticsearchHttp.REL_FAILURE, 0); + } + + @Test + public void testFetchElasticsearchWithBadHosts() throws IOException { + runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(false)); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "127.0.0.1:9200"); + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + + runner.assertNotValid(); + } + + @Test + public void testSetupSecureClient() throws Exception { + FetchElasticsearchHttpTestProcessor processor = new FetchElasticsearchHttpTestProcessor(true); + runner = TestRunners.newTestRunner(processor); + SSLContextService sslService = mock(SSLContextService.class); + when(sslService.getIdentifier()).thenReturn("ssl-context"); + runner.addControllerService("ssl-context", sslService); + runner.enableControllerService(sslService); + runner.setProperty(FetchElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setValidateExpressionUsage(true); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + + // Allow time for the controller service to fully initialize + Thread.sleep(500); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + } + + /** + * A Test class that extends the processor in order to inject/mock behavior + */ + private static class FetchElasticsearchHttpTestProcessor extends FetchElasticsearchHttp { + boolean documentExists = true; + Exception exceptionToThrow = null; + OkHttpClient client; + int statusCode = 200; + String statusMessage = "OK"; + + FetchElasticsearchHttpTestProcessor(boolean documentExists) { + this.documentExists = documentExists; + } + + public void setExceptionToThrow(Exception exceptionToThrow) { + this.exceptionToThrow = exceptionToThrow; + } + + void setStatus(int code, String message) { + statusCode = code; + statusMessage = message; + } + + @Override + protected void createElasticsearchClient(ProcessContext context) throws ProcessException { + client = mock(OkHttpClient.class); + + when(client.newCall(any(Request.class))).thenAnswer(new Answer<Call>() { + + @Override + public Call answer(InvocationOnMock invocationOnMock) throws Throwable { + Request realRequest = (Request) invocationOnMock.getArguments()[0]; + StringBuilder sb = new StringBuilder("{\"_index\":\"randomuser.me\",\"_type\":\"user\",\"_id\":\"0\",\"_version\":2,"); + if (documentExists) { + sb.append("\"found\":true,\"_source\":{\"gender\":\"female\",\"name\":{\"title\":\"Ms\",\"first\":\"Joan\",\"last\":\"Smith\"}}"); + } else { + sb.append("\"found\": false"); + } + sb.append("}"); + Response mockResponse = new Response.Builder() + .request(realRequest) + .protocol(Protocol.HTTP_1_1) + .code(statusCode) + .message(statusMessage) + .body(ResponseBody.create(MediaType.parse("application/json"), sb.toString())) + .build(); + final Call call = mock(Call.class); + when(call.execute()).thenReturn(mockResponse); + return call; + } + }); + } + + protected OkHttpClient getClient() { + return client; + } + } + + ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Integration test section below + // + // The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution. + // However if you wish to execute them as part of a test phase, comment out the @Ignored line for each + // desired test. + ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Tests basic ES functionality against a local or test ES cluster + */ + @Test + @Ignore("Comment this out if you want to run against local or test ES") + public void testFetchElasticsearchBasic() { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp()); + runner.setValidateExpressionUsage(true); + + //Local Cluster - Mac pulled from brew + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + runner.assertValid(); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + + runner.enqueue(docExample); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1); + } + + @Test + @Ignore("Comment this out if you want to run against local or test ES") + public void testFetchElasticsearchBatch() throws IOException { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp()); + runner.setValidateExpressionUsage(true); + + //Local Cluster - Mac pulled from brew + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + runner.assertValid(); + + for (int i = 0; i < 100; i++) { + long newId = 28039652140L + i; + final String newStrId = Long.toString(newId); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", newStrId); + }}); + } + runner.run(100); + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 100); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java index ce25b81..d7fb439 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java @@ -78,10 +78,10 @@ public class TestPutElasticsearch { public void testPutElasticSearchOnTrigger() throws IOException { runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures runner.setValidateExpressionUsage(true); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.assertNotValid(); @@ -106,10 +106,10 @@ public class TestPutElasticsearch { public void testPutElasticSearchOnTriggerWithFailures() throws IOException { runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures runner.setValidateExpressionUsage(false); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.setProperty(PutElasticsearch.TYPE, "status"); runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); @@ -130,10 +130,10 @@ public class TestPutElasticsearch { public void testPutElasticsearchOnTriggerWithExceptions() throws IOException { PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(false); runner = TestRunners.newTestRunner(processor); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.setProperty(PutElasticsearch.TYPE, "status"); runner.setValidateExpressionUsage(true); @@ -194,10 +194,10 @@ public class TestPutElasticsearch { public void testPutElasticsearchOnTriggerWithNoIdAttribute() throws IOException { runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures runner.setValidateExpressionUsage(false); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.setProperty(PutElasticsearch.TYPE, "status"); runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); @@ -215,10 +215,10 @@ public class TestPutElasticsearch { public void testPutElasticsearchOnTriggerWithIndexFromAttribute() throws IOException { runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); runner.setValidateExpressionUsage(false); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(PutElasticsearch.INDEX, "${i}"); runner.setProperty(PutElasticsearch.TYPE, "${type}"); runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); @@ -252,10 +252,10 @@ public class TestPutElasticsearch { public void testPutElasticSearchOnTriggerWithInvalidIndexOp() throws IOException { runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures runner.setValidateExpressionUsage(true); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.assertNotValid(); @@ -380,10 +380,10 @@ public class TestPutElasticsearch { runner.setValidateExpressionUsage(false); //Local Cluster - Mac pulled from brew - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); @@ -412,10 +412,10 @@ public class TestPutElasticsearch { runner.setValidateExpressionUsage(false); //Local Cluster - Mac pulled from brew - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.setProperty(PutElasticsearch.BATCH_SIZE, "100"); http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java new file mode 100644 index 0000000..c3d5a34 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import okhttp3.Call; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.HashMap; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestPutElasticsearchHttp { + + private static byte[] docExample; + private TestRunner runner; + + @Before + public void once() throws IOException { + ClassLoader classloader = Thread.currentThread().getContextClassLoader(); + docExample = IOUtils.toString(classloader.getResourceAsStream("DocumentExample.json")).getBytes(); + } + + @After + public void teardown() { + runner = null; + } + + @Test + public void testPutElasticSearchOnTriggerIndex() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testPutElasticSearchOnTriggerUpdate() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + runner.setProperty(PutElasticsearchHttp.INDEX_OP, "Update"); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testPutElasticSearchOnTriggerDelete() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + runner.setProperty(PutElasticsearchHttp.INDEX_OP, "DELETE"); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testPutElasticSearchOnTriggerBadIndexOp() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + runner.setProperty(PutElasticsearchHttp.INDEX_OP, "${no.attr}"); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_FAILURE).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testPutElasticSearchInvalidConfig() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.assertValid(); + runner.setProperty(PutElasticsearchHttp.INDEX_OP, ""); + runner.assertNotValid(); + runner.setProperty(PutElasticsearchHttp.INDEX_OP, "index"); + runner.assertValid(); + runner.setProperty(PutElasticsearchHttp.INDEX_OP, "upsert"); + runner.assertNotValid(); + } + + @Test(expected = AssertionError.class) + public void testPutElasticSearchOnTriggerWithFailures() throws IOException { + PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(true); + processor.setStatus(100, "Should fail"); + runner = TestRunners.newTestRunner(processor); // simulate failures + runner.setValidateExpressionUsage(false); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + } + + @Test + public void testPutElasticsearchOnTriggerWithNoIdAttribute() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures + runner.setValidateExpressionUsage(false); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + + runner.enqueue(docExample); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_FAILURE).get(0); + assertNotNull(out); + } + + @Test + public void testPutElasticsearchOnTriggerWithIndexFromAttribute() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); + runner.setValidateExpressionUsage(false); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttp.INDEX, "${i}"); + runner.setProperty(PutElasticsearchHttp.TYPE, "${type}"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652144"); + put("i", "doc"); + put("type", "status"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + runner.clearTransferState(); + + // Now try an empty attribute value, should fail + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652144"); + put("type", "status"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1); + final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_FAILURE).get(0); + assertNotNull(out2); + } + + @Test + public void testPutElasticSearchOnTriggerWithInvalidIndexOp() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.assertValid(); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + runner.assertValid(); + + runner.setProperty(PutElasticsearchHttp.INDEX_OP, "index_fail"); + runner.assertValid(); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_FAILURE).get(0); + assertNotNull(out); + } + + /** + * A Test class that extends the processor in order to inject/mock behavior + */ + private static class PutElasticsearchTestProcessor extends PutElasticsearchHttp { + boolean responseHasFailures = false; + OkHttpClient client; + int statusCode = 200; + String statusMessage = "OK"; + + PutElasticsearchTestProcessor(boolean responseHasFailures) { + this.responseHasFailures = responseHasFailures; + } + + void setStatus(int code, String message) { + statusCode = code; + statusMessage = message; + } + + @Override + protected void createElasticsearchClient(ProcessContext context) throws ProcessException { + client = mock(OkHttpClient.class); + + when(client.newCall(any(Request.class))).thenAnswer(new Answer<Call>() { + + @Override + public Call answer(InvocationOnMock invocationOnMock) throws Throwable { + Request realRequest = (Request) invocationOnMock.getArguments()[0]; + StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \""); + sb.append(responseHasFailures); + sb.append("\", \"items\": ["); + if (responseHasFailures) { + // This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside + sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\","); + sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\","); + sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at "); + sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}}"); + } else { + sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":"); + sb.append(statusCode); + sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}"); + } + sb.append("]}"); + Response mockResponse = new Response.Builder() + .request(realRequest) + .protocol(Protocol.HTTP_1_1) + .code(statusCode) + .message(statusMessage) + .body(ResponseBody.create(MediaType.parse("application/json"), sb.toString())) + .build(); + final Call call = mock(Call.class); + when(call.execute()).thenReturn(mockResponse); + return call; + } + }); + } + + protected OkHttpClient getClient() { + return client; + } + } + + ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Integration test section below + // + // The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution. + // However if you wish to execute them as part of a test phase, comment out the @Ignored line for each + // desired test. + ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Tests basic ES functionality against a local or test ES cluster + */ + @Test + @Ignore("Comment this out if you want to run against local or test ES") + public void testPutElasticSearchBasic() { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttp()); + runner.setValidateExpressionUsage(false); + + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + runner.assertValid(); + + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", "28039652140"); + }}); + + runner.enqueue(docExample); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1); + } + + @Test + @Ignore("Comment this out if you want to run against local or test ES") + public void testPutElasticSearchBatch() throws IOException { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttp()); + runner.setValidateExpressionUsage(false); + + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "100"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + runner.assertValid(); + + for (int i = 0; i < 100; i++) { + long newId = 28039652140L + i; + final String newStrId = Long.toString(newId); + runner.enqueue(docExample, new HashMap<String, String>() {{ + put("doc_id", newStrId); + }}); + } + runner.run(); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 100); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json index 014a66c..66449cf 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json @@ -1,19 +1,3 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ { "created_at": "Thu Jan 21 16:02:46 +0000 2016", "text": "This is a test document from a mock social media service",
