Repository: nifi
Updated Branches:
  refs/heads/master 9d273b1e2 -> 181386b94


http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java
index cb928fa..9b68f2e 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java
@@ -79,10 +79,10 @@ public class TestFetchElasticsearch {
     public void testFetchElasticsearchOnTrigger() throws IOException {
         runner = TestRunners.newTestRunner(new 
FetchElasticsearchTestProcessor(true)); // all docs are found
         runner.setValidateExpressionUsage(true);
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
-        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
-        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
-        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, 
"elasticsearch");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, 
"127.0.0.1:9300");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL,
 "5s");
 
         runner.setProperty(FetchElasticsearch.INDEX, "doc");
         runner.assertNotValid();
@@ -105,10 +105,10 @@ public class TestFetchElasticsearch {
     @Test
     public void testFetchElasticsearchOnTriggerWithFailures() throws 
IOException {
         runner = TestRunners.newTestRunner(new 
FetchElasticsearchTestProcessor(false)); // simulate doc not found
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
-        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
-        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
-        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, 
"elasticsearch");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, 
"127.0.0.1:9300");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL,
 "5s");
         runner.setProperty(FetchElasticsearch.INDEX, "doc");
         runner.setProperty(FetchElasticsearch.TYPE, "status");
         runner.setValidateExpressionUsage(true);
@@ -129,10 +129,10 @@ public class TestFetchElasticsearch {
     @Test
     public void testFetchElasticsearchWithBadHosts() throws IOException {
         runner = TestRunners.newTestRunner(new 
FetchElasticsearchTestProcessor(false)); // simulate doc not found
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
-        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"http://127.0.0.1:9300,127.0.0.2:9300";);
-        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
-        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, 
"elasticsearch");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, 
"http://127.0.0.1:9300,127.0.0.2:9300";);
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL,
 "5s");
         runner.setProperty(FetchElasticsearch.INDEX, "doc");
         runner.setProperty(FetchElasticsearch.TYPE, "status");
         runner.setValidateExpressionUsage(true);
@@ -145,10 +145,10 @@ public class TestFetchElasticsearch {
     public void testFetchElasticsearchOnTriggerWithExceptions() throws 
IOException {
         FetchElasticsearchTestProcessor processor = new 
FetchElasticsearchTestProcessor(true);
         runner = TestRunners.newTestRunner(processor);
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
-        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
-        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
-        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, 
"elasticsearch");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, 
"127.0.0.1:9300");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL,
 "5s");
         runner.setProperty(FetchElasticsearch.INDEX, "doc");
         runner.setProperty(FetchElasticsearch.TYPE, "status");
         runner.setValidateExpressionUsage(true);
@@ -230,10 +230,10 @@ public class TestFetchElasticsearch {
         runner.addControllerService("ssl-context", sslService);
         runner.enableControllerService(sslService);
         runner.setProperty(FetchElasticsearch.PROP_SSL_CONTEXT_SERVICE, 
"ssl-context");
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
-        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
-        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
-        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, 
"elasticsearch");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, 
"127.0.0.1:9300");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL,
 "5s");
         runner.setProperty(FetchElasticsearch.INDEX, "doc");
         runner.setProperty(FetchElasticsearch.TYPE, "status");
         runner.setValidateExpressionUsage(true);
@@ -347,10 +347,10 @@ public class TestFetchElasticsearch {
         runner.setValidateExpressionUsage(true);
 
         //Local Cluster - Mac pulled from brew
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
-        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
-        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
-        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, 
"127.0.0.1:9300");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL,
 "5s");
 
         runner.setProperty(FetchElasticsearch.INDEX, "doc");
 
@@ -378,10 +378,10 @@ public class TestFetchElasticsearch {
         runner.setValidateExpressionUsage(true);
 
         //Local Cluster - Mac pulled from brew
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
-        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
-        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
-        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, 
"127.0.0.1:9300");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL,
 "5s");
         runner.setProperty(FetchElasticsearch.INDEX, "doc");
 
         runner.setProperty(FetchElasticsearch.TYPE, "status");

http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
new file mode 100644
index 0000000..82fa3da
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import okhttp3.Call;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Protocol;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestFetchElasticsearchHttp {
+
+    private InputStream docExample;
+    private TestRunner runner;
+
+    @Before
+    public void setUp() throws IOException {
+        ClassLoader classloader = 
Thread.currentThread().getContextClassLoader();
+        docExample = classloader.getResourceAsStream("DocumentExample.json");
+    }
+
+    @After
+    public void teardown() {
+        runner = null;
+    }
+
+    @Test
+    public void testFetchElasticsearchOnTrigger() throws IOException {
+        runner = TestRunners.newTestRunner(new 
FetchElasticsearchHttpTestProcessor(true)); // all docs are found
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+
+        runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
+        runner.assertNotValid();
+        runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+        runner.assertValid();
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        
runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
+    public void testFetchElasticsearchOnTriggerWithFields() throws IOException 
{
+        runner = TestRunners.newTestRunner(new 
FetchElasticsearchHttpTestProcessor(true)); // all docs are found
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+
+        runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
+        runner.assertNotValid();
+        runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+        runner.assertValid();
+        runner.setProperty(FetchElasticsearchHttp.FIELDS, "id,, 
userinfo.location");
+        runner.assertValid();
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        
runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
+    public void testFetchElasticsearchOnTriggerWithDocNotFound() throws 
IOException {
+        runner = TestRunners.newTestRunner(new 
FetchElasticsearchHttpTestProcessor(false)); // simulate doc not found
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+
+        runner.setIncomingConnection(true);
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        // This test generates a "document not found"
+        
runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_NOT_FOUND, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_NOT_FOUND).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
+    public void testFetchElasticsearchOnTriggerWithServerErrorRetry() throws 
IOException {
+        FetchElasticsearchHttpTestProcessor processor = new 
FetchElasticsearchHttpTestProcessor(false);
+        processor.setStatus(500, "Server error");
+        runner = TestRunners.newTestRunner(processor); // simulate doc not 
found
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        // This test generates a HTTP 500 "Server error"
+        runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_RETRY, 
1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_RETRY).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
+    public void testFetchElasticsearchOnTriggerWithServerFail() throws 
IOException {
+        FetchElasticsearchHttpTestProcessor processor = new 
FetchElasticsearchHttpTestProcessor(false);
+        processor.setStatus(100, "Should fail");
+        runner = TestRunners.newTestRunner(processor); // simulate doc not 
found
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        // This test generates a HTTP 100
+        
runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_FAILURE, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_FAILURE).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
+    public void 
testFetchElasticsearchOnTriggerWithServerFailNoIncomingFlowFile() throws 
IOException {
+        FetchElasticsearchHttpTestProcessor processor = new 
FetchElasticsearchHttpTestProcessor(false);
+        processor.setStatus(100, "Should fail");
+        runner = TestRunners.newTestRunner(processor); // simulate doc not 
found
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+
+        runner.setIncomingConnection(false);
+        runner.run(1, true, true);
+
+        // This test generates a HTTP 100 with no incoming flow file, so 
nothing should be transferred
+        processor.getRelationships().forEach(relationship -> 
runner.assertTransferCount(relationship, 0));
+        runner.assertTransferCount(FetchElasticsearchHttp.REL_FAILURE, 0);
+    }
+
+    @Test
+    public void testFetchElasticsearchWithBadHosts() throws IOException {
+        runner = TestRunners.newTestRunner(new 
FetchElasticsearchHttpTestProcessor(false)); // simulate doc not found
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"127.0.0.1:9200");
+        runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testSetupSecureClient() throws Exception {
+        FetchElasticsearchHttpTestProcessor processor = new 
FetchElasticsearchHttpTestProcessor(true);
+        runner = TestRunners.newTestRunner(processor);
+        SSLContextService sslService = mock(SSLContextService.class);
+        when(sslService.getIdentifier()).thenReturn("ssl-context");
+        runner.addControllerService("ssl-context", sslService);
+        runner.enableControllerService(sslService);
+        runner.setProperty(FetchElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, 
"ssl-context");
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+
+        // Allow time for the controller service to fully initialize
+        Thread.sleep(500);
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+    }
+
+    /**
+     * A Test class that extends the processor in order to inject/mock behavior
+     */
+    private static class FetchElasticsearchHttpTestProcessor extends 
FetchElasticsearchHttp {
+        boolean documentExists = true;
+        Exception exceptionToThrow = null;
+        OkHttpClient client;
+        int statusCode = 200;
+        String statusMessage = "OK";
+
+        FetchElasticsearchHttpTestProcessor(boolean documentExists) {
+            this.documentExists = documentExists;
+        }
+
+        public void setExceptionToThrow(Exception exceptionToThrow) {
+            this.exceptionToThrow = exceptionToThrow;
+        }
+
+        void setStatus(int code, String message) {
+            statusCode = code;
+            statusMessage = message;
+        }
+
+        @Override
+        protected void createElasticsearchClient(ProcessContext context) 
throws ProcessException {
+            client = mock(OkHttpClient.class);
+
+            when(client.newCall(any(Request.class))).thenAnswer(new 
Answer<Call>() {
+
+                @Override
+                public Call answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                    Request realRequest = (Request) 
invocationOnMock.getArguments()[0];
+                    StringBuilder sb = new 
StringBuilder("{\"_index\":\"randomuser.me\",\"_type\":\"user\",\"_id\":\"0\",\"_version\":2,");
+                    if (documentExists) {
+                        
sb.append("\"found\":true,\"_source\":{\"gender\":\"female\",\"name\":{\"title\":\"Ms\",\"first\":\"Joan\",\"last\":\"Smith\"}}");
+                    } else {
+                        sb.append("\"found\": false");
+                    }
+                    sb.append("}");
+                    Response mockResponse = new Response.Builder()
+                            .request(realRequest)
+                            .protocol(Protocol.HTTP_1_1)
+                            .code(statusCode)
+                            .message(statusMessage)
+                            
.body(ResponseBody.create(MediaType.parse("application/json"), sb.toString()))
+                            .build();
+                    final Call call = mock(Call.class);
+                    when(call.execute()).thenReturn(mockResponse);
+                    return call;
+                }
+            });
+        }
+
+        protected OkHttpClient getClient() {
+            return client;
+        }
+    }
+
+    
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
+    // Integration test section below
+    //
+    // The tests below are meant to run on real ES instances, and are thus 
@Ignored during normal test execution.
+    // However if you wish to execute them as part of a test phase, comment 
out the @Ignored line for each
+    // desired test.
+    
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+    /**
+     * Tests basic ES functionality against a local or test ES cluster
+     */
+    @Test
+    @Ignore("Comment this out if you want to run against local or test ES")
+    public void testFetchElasticsearchBasic() {
+        System.out.println("Starting test " + new Object() {
+        }.getClass().getEnclosingMethod().getName());
+        final TestRunner runner = TestRunners.newTestRunner(new 
FetchElasticsearchHttp());
+        runner.setValidateExpressionUsage(true);
+
+        //Local Cluster - Mac pulled from brew
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
+        runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+        runner.assertValid();
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+
+        runner.enqueue(docExample);
+        runner.run(1, true, true);
+        
runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1);
+    }
+
+    @Test
+    @Ignore("Comment this out if you want to run against local or test ES")
+    public void testFetchElasticsearchBatch() throws IOException {
+        System.out.println("Starting test " + new Object() {
+        }.getClass().getEnclosingMethod().getName());
+        final TestRunner runner = TestRunners.newTestRunner(new 
FetchElasticsearchHttp());
+        runner.setValidateExpressionUsage(true);
+
+        //Local Cluster - Mac pulled from brew
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
+        runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+        runner.assertValid();
+
+        for (int i = 0; i < 100; i++) {
+            long newId = 28039652140L + i;
+            final String newStrId = Long.toString(newId);
+            runner.enqueue(docExample, new HashMap<String, String>() {{
+                put("doc_id", newStrId);
+            }});
+        }
+        runner.run(100);
+        
runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 100);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
index ce25b81..d7fb439 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
@@ -78,10 +78,10 @@ public class TestPutElasticsearch {
     public void testPutElasticSearchOnTrigger() throws IOException {
         runner = TestRunners.newTestRunner(new 
PutElasticsearchTestProcessor(false)); // no failures
         runner.setValidateExpressionUsage(true);
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
-        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
-        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
-        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, 
"elasticsearch");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, 
"127.0.0.1:9300");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL,
 "5s");
 
         runner.setProperty(PutElasticsearch.INDEX, "doc");
         runner.assertNotValid();
@@ -106,10 +106,10 @@ public class TestPutElasticsearch {
     public void testPutElasticSearchOnTriggerWithFailures() throws IOException 
{
         runner = TestRunners.newTestRunner(new 
PutElasticsearchTestProcessor(true)); // simulate failures
         runner.setValidateExpressionUsage(false);
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
-        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
-        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
-        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, 
"elasticsearch");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, 
"127.0.0.1:9300");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL,
 "5s");
         runner.setProperty(PutElasticsearch.INDEX, "doc");
         runner.setProperty(PutElasticsearch.TYPE, "status");
         runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
@@ -130,10 +130,10 @@ public class TestPutElasticsearch {
     public void testPutElasticsearchOnTriggerWithExceptions() throws 
IOException {
         PutElasticsearchTestProcessor processor = new 
PutElasticsearchTestProcessor(false);
         runner = TestRunners.newTestRunner(processor);
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
-        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
-        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
-        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, 
"elasticsearch");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, 
"127.0.0.1:9300");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL,
 "5s");
         runner.setProperty(PutElasticsearch.INDEX, "doc");
         runner.setProperty(PutElasticsearch.TYPE, "status");
         runner.setValidateExpressionUsage(true);
@@ -194,10 +194,10 @@ public class TestPutElasticsearch {
     public void testPutElasticsearchOnTriggerWithNoIdAttribute() throws 
IOException {
         runner = TestRunners.newTestRunner(new 
PutElasticsearchTestProcessor(true)); // simulate failures
         runner.setValidateExpressionUsage(false);
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
-        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
-        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
-        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, 
"elasticsearch");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, 
"127.0.0.1:9300");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL,
 "5s");
         runner.setProperty(PutElasticsearch.INDEX, "doc");
         runner.setProperty(PutElasticsearch.TYPE, "status");
         runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
@@ -215,10 +215,10 @@ public class TestPutElasticsearch {
     public void testPutElasticsearchOnTriggerWithIndexFromAttribute() throws 
IOException {
         runner = TestRunners.newTestRunner(new 
PutElasticsearchTestProcessor(false));
         runner.setValidateExpressionUsage(false);
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
-        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
-        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
-        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, 
"elasticsearch");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, 
"127.0.0.1:9300");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL,
 "5s");
         runner.setProperty(PutElasticsearch.INDEX, "${i}");
         runner.setProperty(PutElasticsearch.TYPE, "${type}");
         runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
@@ -252,10 +252,10 @@ public class TestPutElasticsearch {
     public void testPutElasticSearchOnTriggerWithInvalidIndexOp() throws 
IOException {
         runner = TestRunners.newTestRunner(new 
PutElasticsearchTestProcessor(false)); // no failures
         runner.setValidateExpressionUsage(true);
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
-        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
-        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
-        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, 
"elasticsearch");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, 
"127.0.0.1:9300");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL,
 "5s");
 
         runner.setProperty(PutElasticsearch.INDEX, "doc");
         runner.assertNotValid();
@@ -380,10 +380,10 @@ public class TestPutElasticsearch {
         runner.setValidateExpressionUsage(false);
 
         //Local Cluster - Mac pulled from brew
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
-        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
-        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
-        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, 
"127.0.0.1:9300");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL,
 "5s");
 
         runner.setProperty(PutElasticsearch.INDEX, "doc");
         runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
@@ -412,10 +412,10 @@ public class TestPutElasticsearch {
         runner.setValidateExpressionUsage(false);
 
         //Local Cluster - Mac pulled from brew
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
-        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
-        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
-        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, 
"127.0.0.1:9300");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, 
"5s");
+        
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL,
 "5s");
         runner.setProperty(PutElasticsearch.INDEX, "doc");
         runner.setProperty(PutElasticsearch.BATCH_SIZE, "100");
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
new file mode 100644
index 0000000..c3d5a34
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import okhttp3.Call;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Protocol;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestPutElasticsearchHttp {
+
+    private static byte[] docExample;
+    private TestRunner runner;
+
+    @Before
+    public void once() throws IOException {
+        ClassLoader classloader = 
Thread.currentThread().getContextClassLoader();
+        docExample = 
IOUtils.toString(classloader.getResourceAsStream("DocumentExample.json")).getBytes();
+    }
+
+    @After
+    public void teardown() {
+        runner = null;
+    }
+
+    @Test
+    public void testPutElasticSearchOnTriggerIndex() throws IOException {
+        runner = TestRunners.newTestRunner(new 
PutElasticsearchTestProcessor(false)); // no failures
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttp.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 
1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
+    public void testPutElasticSearchOnTriggerUpdate() throws IOException {
+        runner = TestRunners.newTestRunner(new 
PutElasticsearchTestProcessor(false)); // no failures
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttp.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
+        runner.setProperty(PutElasticsearchHttp.INDEX_OP, "Update");
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 
1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
+    public void testPutElasticSearchOnTriggerDelete() throws IOException {
+        runner = TestRunners.newTestRunner(new 
PutElasticsearchTestProcessor(false)); // no failures
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttp.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
+        runner.setProperty(PutElasticsearchHttp.INDEX_OP, "DELETE");
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 
1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
+    public void testPutElasticSearchOnTriggerBadIndexOp() throws IOException {
+        runner = TestRunners.newTestRunner(new 
PutElasticsearchTestProcessor(false)); // no failures
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttp.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
+        runner.setProperty(PutElasticsearchHttp.INDEX_OP, "${no.attr}");
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 
1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_FAILURE).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
+    public void testPutElasticSearchInvalidConfig() throws IOException {
+        runner = TestRunners.newTestRunner(new 
PutElasticsearchTestProcessor(false)); // no failures
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(PutElasticsearchHttp.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+        runner.assertValid();
+        runner.setProperty(PutElasticsearchHttp.INDEX_OP, "");
+        runner.assertNotValid();
+        runner.setProperty(PutElasticsearchHttp.INDEX_OP, "index");
+        runner.assertValid();
+        runner.setProperty(PutElasticsearchHttp.INDEX_OP, "upsert");
+        runner.assertNotValid();
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testPutElasticSearchOnTriggerWithFailures() throws IOException 
{
+        PutElasticsearchTestProcessor processor = new 
PutElasticsearchTestProcessor(true);
+        processor.setStatus(100, "Should fail");
+        runner = TestRunners.newTestRunner(processor); // simulate failures
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttp.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+    }
+
+    @Test
+    public void testPutElasticsearchOnTriggerWithNoIdAttribute() throws 
IOException {
+        runner = TestRunners.newTestRunner(new 
PutElasticsearchTestProcessor(true)); // simulate failures
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttp.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
+
+        runner.enqueue(docExample);
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 
1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_FAILURE).get(0);
+        assertNotNull(out);
+    }
+
+    @Test
+    public void testPutElasticsearchOnTriggerWithIndexFromAttribute() throws 
IOException {
+        runner = TestRunners.newTestRunner(new 
PutElasticsearchTestProcessor(false));
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(PutElasticsearchHttp.INDEX, "${i}");
+        runner.setProperty(PutElasticsearchHttp.TYPE, "${type}");
+        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652144");
+            put("i", "doc");
+            put("type", "status");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 
1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        runner.clearTransferState();
+
+        // Now try an empty attribute value, should fail
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652144");
+            put("type", "status");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 
1);
+        final MockFlowFile out2 = 
runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_FAILURE).get(0);
+        assertNotNull(out2);
+    }
+
+    @Test
+    public void testPutElasticSearchOnTriggerWithInvalidIndexOp() throws 
IOException {
+        runner = TestRunners.newTestRunner(new 
PutElasticsearchTestProcessor(false)); // no failures
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(PutElasticsearchHttp.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+        runner.assertValid();
+        runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
+        runner.assertValid();
+
+        runner.setProperty(PutElasticsearchHttp.INDEX_OP, "index_fail");
+        runner.assertValid();
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 
1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_FAILURE).get(0);
+        assertNotNull(out);
+    }
+
+    /**
+     * A Test class that extends the processor in order to inject/mock behavior
+     */
+    private static class PutElasticsearchTestProcessor extends 
PutElasticsearchHttp {
+        boolean responseHasFailures = false;
+        OkHttpClient client;
+        int statusCode = 200;
+        String statusMessage = "OK";
+
+        PutElasticsearchTestProcessor(boolean responseHasFailures) {
+            this.responseHasFailures = responseHasFailures;
+        }
+
+        void setStatus(int code, String message) {
+            statusCode = code;
+            statusMessage = message;
+        }
+
+        @Override
+        protected void createElasticsearchClient(ProcessContext context) 
throws ProcessException {
+            client = mock(OkHttpClient.class);
+
+            when(client.newCall(any(Request.class))).thenAnswer(new 
Answer<Call>() {
+
+                @Override
+                public Call answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                    Request realRequest = (Request) 
invocationOnMock.getArguments()[0];
+                    StringBuilder sb = new StringBuilder("{\"took\": 1, 
\"errors\": \"");
+                    sb.append(responseHasFailures);
+                    sb.append("\", \"items\": [");
+                    if (responseHasFailures) {
+                        // This case is for a status code of 200 for the bulk 
response itself, but with an error (of 400) inside
+                        
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
+                        
sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed 
to parse [gender]\",");
+                        
sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected
 end-of-input in VALUE_STRING\\n at ");
+                        sb.append("[Source: 
org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, 
column: 39]\"}}}}");
+                    } else {
+                        
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":");
+                        sb.append(statusCode);
+                        sb.append(",\"_source\":{\"text\": \"This is a test 
document\"}}}");
+                    }
+                    sb.append("]}");
+                    Response mockResponse = new Response.Builder()
+                            .request(realRequest)
+                            .protocol(Protocol.HTTP_1_1)
+                            .code(statusCode)
+                            .message(statusMessage)
+                            
.body(ResponseBody.create(MediaType.parse("application/json"), sb.toString()))
+                            .build();
+                    final Call call = mock(Call.class);
+                    when(call.execute()).thenReturn(mockResponse);
+                    return call;
+                }
+            });
+        }
+
+        protected OkHttpClient getClient() {
+            return client;
+        }
+    }
+
+    
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
+    // Integration test section below
+    //
+    // The tests below are meant to run on real ES instances, and are thus 
@Ignored during normal test execution.
+    // However if you wish to execute them as part of a test phase, comment 
out the @Ignored line for each
+    // desired test.
+    
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+    /**
+     * Tests basic ES functionality against a local or test ES cluster
+     */
+    @Test
+    @Ignore("Comment this out if you want to run against local or test ES")
+    public void testPutElasticSearchBasic() {
+        System.out.println("Starting test " + new Object() {
+        }.getClass().getEnclosingMethod().getName());
+        final TestRunner runner = TestRunners.newTestRunner(new 
PutElasticsearchHttp());
+        runner.setValidateExpressionUsage(false);
+
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearchHttp.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
+        runner.assertValid();
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+
+        runner.enqueue(docExample);
+        runner.run(1, true, true);
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 
1);
+    }
+
+    @Test
+    @Ignore("Comment this out if you want to run against local or test ES")
+    public void testPutElasticSearchBatch() throws IOException {
+        System.out.println("Starting test " + new Object() {
+        }.getClass().getEnclosingMethod().getName());
+        final TestRunner runner = TestRunners.newTestRunner(new 
PutElasticsearchHttp());
+        runner.setValidateExpressionUsage(false);
+
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "100");
+        runner.setProperty(PutElasticsearchHttp.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
+        runner.assertValid();
+
+        for (int i = 0; i < 100; i++) {
+            long newId = 28039652140L + i;
+            final String newStrId = Long.toString(newId);
+            runner.enqueue(docExample, new HashMap<String, String>() {{
+                put("doc_id", newStrId);
+            }});
+        }
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 
100);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/181386b9/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json
index 014a66c..66449cf 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json
@@ -1,19 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 {
   "created_at": "Thu Jan 21 16:02:46 +0000 2016",
   "text": "This is a test document from a mock social media service",

Reply via email to