http://git-wip-us.apache.org/repos/asf/nifi/blob/75af3a2e/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java
new file mode 100644
index 0000000..cb928fa
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.MockProcessorInitializationContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.get.GetAction;
+import org.elasticsearch.action.get.GetRequestBuilder;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.support.AdapterActionFuture;
+import org.elasticsearch.client.transport.NoNodeAvailableException;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.NodeClosedException;
+import org.elasticsearch.transport.ReceiveTimeoutTransportException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class TestFetchElasticsearch {
+
+    private InputStream docExample;
+    private TestRunner runner;
+
+    @Before
+    public void setUp() throws IOException {
+        ClassLoader classloader = 
Thread.currentThread().getContextClassLoader();
+        docExample = classloader.getResourceAsStream("DocumentExample.json");
+
+    }
+
+    @After
+    public void teardown() {
+        runner = null;
+    }
+
+    @Test
+    public void testFetchElasticsearchOnTrigger() throws IOException {
+        runner = TestRunners.newTestRunner(new 
FetchElasticsearchTestProcessor(true)); // all docs are found
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
+        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
+        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
+        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+
+        runner.setProperty(FetchElasticsearch.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(FetchElasticsearch.TYPE, "status");
+        runner.assertNotValid();
+        runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
+        runner.assertValid();
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 
1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(FetchElasticsearch.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
+    public void testFetchElasticsearchOnTriggerWithFailures() throws 
IOException {
+        runner = TestRunners.newTestRunner(new 
FetchElasticsearchTestProcessor(false)); // simulate doc not found
+        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
+        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
+        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
+        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        runner.setProperty(FetchElasticsearch.INDEX, "doc");
+        runner.setProperty(FetchElasticsearch.TYPE, "status");
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        // This test generates a "document not found"
+        runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_NOT_FOUND, 
1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(FetchElasticsearch.REL_NOT_FOUND).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("doc_id", "28039652140");
+    }
+
+    @Test
+    public void testFetchElasticsearchWithBadHosts() throws IOException {
+        runner = TestRunners.newTestRunner(new 
FetchElasticsearchTestProcessor(false)); // simulate doc not found
+        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
+        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"http://127.0.0.1:9300,127.0.0.2:9300";);
+        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
+        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        runner.setProperty(FetchElasticsearch.INDEX, "doc");
+        runner.setProperty(FetchElasticsearch.TYPE, "status");
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
+
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testFetchElasticsearchOnTriggerWithExceptions() throws 
IOException {
+        FetchElasticsearchTestProcessor processor = new 
FetchElasticsearchTestProcessor(true);
+        runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
+        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
+        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
+        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        runner.setProperty(FetchElasticsearch.INDEX, "doc");
+        runner.setProperty(FetchElasticsearch.TYPE, "status");
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
+
+        // No Node Available exception
+        processor.setExceptionToThrow(new NoNodeAvailableException("test"));
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
+        runner.clearTransferState();
+
+        // Elasticsearch Timeout exception
+        processor.setExceptionToThrow(new 
ElasticsearchTimeoutException("test"));
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652141");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
+        runner.clearTransferState();
+
+        // Receive Timeout Transport exception
+        processor.setExceptionToThrow(new 
ReceiveTimeoutTransportException(mock(StreamInput.class)));
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652141");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
+        runner.clearTransferState();
+
+        // Node Closed exception
+        processor.setExceptionToThrow(new 
NodeClosedException(mock(StreamInput.class)));
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652141");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
+        runner.clearTransferState();
+
+        // Elasticsearch Parse exception
+        processor.setExceptionToThrow(new ElasticsearchParseException("test"));
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652141");
+        }});
+        runner.run(1, true, true);
+
+        // This test generates an exception on execute(),routes to failure
+        runner.assertTransferCount(FetchElasticsearch.REL_FAILURE, 1);
+    }
+
+    @Test(expected = ProcessException.class)
+    public void testCreateElasticsearchClientWithException() throws 
ProcessException {
+        FetchElasticsearchTestProcessor processor = new 
FetchElasticsearchTestProcessor(true) {
+            @Override
+            protected TransportClient getTransportClient(Settings.Builder 
settingsBuilder, String shieldUrl,
+                                                         String username, 
String password)
+                    throws MalformedURLException {
+                throw new MalformedURLException();
+            }
+        };
+
+        MockProcessContext context = new MockProcessContext(processor);
+        processor.initialize(new MockProcessorInitializationContext(processor, 
context));
+        processor.callCreateElasticsearchClient(context);
+    }
+
+    @Test
+    public void testSetupSecureClient() throws Exception {
+        FetchElasticsearchTestProcessor processor = new 
FetchElasticsearchTestProcessor(true);
+        runner = TestRunners.newTestRunner(processor);
+        SSLContextService sslService = mock(SSLContextService.class);
+        when(sslService.getIdentifier()).thenReturn("ssl-context");
+        runner.addControllerService("ssl-context", sslService);
+        runner.enableControllerService(sslService);
+        runner.setProperty(FetchElasticsearch.PROP_SSL_CONTEXT_SERVICE, 
"ssl-context");
+        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
+        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
+        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
+        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        runner.setProperty(FetchElasticsearch.INDEX, "doc");
+        runner.setProperty(FetchElasticsearch.TYPE, "status");
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
+
+        // Allow time for the controller service to fully initialize
+        Thread.sleep(500);
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+        runner.run(1, true, true);
+
+    }
+
+    /**
+     * A Test class that extends the processor in order to inject/mock behavior
+     */
+    private static class FetchElasticsearchTestProcessor extends 
FetchElasticsearch {
+        boolean documentExists = true;
+        Exception exceptionToThrow = null;
+
+        public FetchElasticsearchTestProcessor(boolean documentExists) {
+            this.documentExists = documentExists;
+        }
+
+        public void setExceptionToThrow(Exception exceptionToThrow) {
+            this.exceptionToThrow = exceptionToThrow;
+        }
+
+        @Override
+        protected TransportClient getTransportClient(Settings.Builder 
settingsBuilder, String shieldUrl,
+                                                     String username, String 
password)
+                throws MalformedURLException {
+            TransportClient mockClient = mock(TransportClient.class);
+            GetRequestBuilder getRequestBuilder = spy(new 
GetRequestBuilder(mockClient, GetAction.INSTANCE));
+            if (exceptionToThrow != null) {
+                doThrow(exceptionToThrow).when(getRequestBuilder).execute();
+            } else {
+                doReturn(new 
MockGetRequestBuilderExecutor(documentExists)).when(getRequestBuilder).execute();
+            }
+            when(mockClient.prepareGet(anyString(), anyString(), 
anyString())).thenReturn(getRequestBuilder);
+
+            return mockClient;
+        }
+
+        public void callCreateElasticsearchClient(ProcessContext context) {
+            createElasticsearchClient(context);
+        }
+
+        private static class MockGetRequestBuilderExecutor
+                extends AdapterActionFuture<GetResponse, 
ActionListener<GetResponse>>
+                implements ListenableActionFuture<GetResponse> {
+
+            boolean documentExists = true;
+
+            public MockGetRequestBuilderExecutor(boolean documentExists) {
+                this.documentExists = documentExists;
+            }
+
+
+            @Override
+            protected GetResponse convert(ActionListener<GetResponse> 
bulkResponseActionListener) {
+                return null;
+            }
+
+            @Override
+            public void addListener(ActionListener<GetResponse> 
actionListener) {
+
+            }
+
+            @Override
+            public GetResponse get() throws InterruptedException, 
ExecutionException {
+                GetResponse response = mock(GetResponse.class);
+                when(response.isExists()).thenReturn(documentExists);
+                
when(response.getSourceAsBytes()).thenReturn("Success".getBytes());
+                when(response.getSourceAsString()).thenReturn("Success");
+                return response;
+            }
+
+            @Override
+            public GetResponse actionGet() {
+                try {
+                    return get();
+                } catch (Exception e) {
+                    fail(e.getMessage());
+                }
+                return null;
+            }
+        }
+    }
+
+
+    
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
+    // Integration test section below
+    //
+    // The tests below are meant to run on real ES instances, and are thus 
@Ignored during normal test execution.
+    // However if you wish to execute them as part of a test phase, comment 
out the @Ignored line for each
+    // desired test.
+    
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+    /**
+     * Tests basic ES functionality against a local or test ES cluster
+     */
+    @Test
+    @Ignore("Comment this out if you want to run against local or test ES")
+    public void testFetchElasticsearchBasic() {
+        System.out.println("Starting test " + new Object() {
+        }.getClass().getEnclosingMethod().getName());
+        final TestRunner runner = TestRunners.newTestRunner(new 
FetchElasticsearch());
+        runner.setValidateExpressionUsage(true);
+
+        //Local Cluster - Mac pulled from brew
+        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
+        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
+        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
+        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+
+        runner.setProperty(FetchElasticsearch.INDEX, "doc");
+
+        runner.setProperty(FetchElasticsearch.TYPE, "status");
+        runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
+        runner.assertValid();
+
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
+        }});
+
+
+        runner.enqueue(docExample);
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 
1);
+    }
+
+    @Test
+    @Ignore("Comment this out if you want to run against local or test ES")
+    public void testFetchElasticsearchBatch() throws IOException {
+        System.out.println("Starting test " + new Object() {
+        }.getClass().getEnclosingMethod().getName());
+        final TestRunner runner = TestRunners.newTestRunner(new 
FetchElasticsearch());
+        runner.setValidateExpressionUsage(true);
+
+        //Local Cluster - Mac pulled from brew
+        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
+        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
+        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
+        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
+        runner.setProperty(FetchElasticsearch.INDEX, "doc");
+
+        runner.setProperty(FetchElasticsearch.TYPE, "status");
+        runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
+        runner.assertValid();
+
+
+        String message = convertStreamToString(docExample);
+        for (int i = 0; i < 100; i++) {
+
+            long newId = 28039652140L + i;
+            final String newStrId = Long.toString(newId);
+            runner.enqueue(message.getBytes(), new HashMap<String, String>() {{
+                put("doc_id", newStrId);
+            }});
+
+        }
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 
100);
+    }
+
+    /**
+     * Convert an input stream to a stream
+     *
+     * @param is input the input stream
+     * @return return the converted input stream as a string
+     */
+    static String convertStreamToString(InputStream is) {
+        java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
+        return s.hasNext() ? s.next() : "";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/75af3a2e/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
index 6af8fd2..dc1c445 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
@@ -16,22 +16,27 @@
  */
 package org.apache.nifi.processors.elasticsearch;
 
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ListenableActionFuture;
 import org.elasticsearch.action.bulk.BulkAction;
+import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexAction;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.support.AdapterActionFuture;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.NoNodeAvailableException;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.node.NodeClosedException;
+import org.elasticsearch.transport.ReceiveTimeoutTransportException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -46,21 +51,20 @@ import java.util.concurrent.ExecutionException;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 public class TestPutElasticsearch {
 
-    private InputStream twitterExample;
+    private InputStream docExample;
     private TestRunner runner;
 
     @Before
     public void setUp() throws IOException {
         ClassLoader classloader = 
Thread.currentThread().getContextClassLoader();
-        twitterExample = classloader
-                .getResourceAsStream("TweetExample.json");
-
+        docExample = classloader.getResourceAsStream("DocumentExample.json");
     }
 
     @After
@@ -70,105 +74,151 @@ public class TestPutElasticsearch {
 
     @Test
     public void testPutElasticSearchOnTrigger() throws IOException {
-        runner = TestRunners.newTestRunner(new 
ElasticsearchTestProcessor(false)); // no failures
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
+        runner = TestRunners.newTestRunner(new 
PutElasticsearchTestProcessor(false)); // no failures
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
         runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
         runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
         runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
 
-        runner.setProperty(PutElasticsearch.INDEX, "tweet");
+        runner.setProperty(PutElasticsearch.INDEX, "doc");
         runner.assertNotValid();
         runner.setProperty(PutElasticsearch.TYPE, "status");
         runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
         runner.assertNotValid();
-        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
+        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
         runner.assertValid();
 
-        runner.enqueue(twitterExample, new HashMap<String, String>() {{
-            put("tweet_id", "28039652140");
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
         }});
         runner.run(1, true, true);
 
         runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
         final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
         assertNotNull(out);
-        out.assertAttributeEquals("tweet_id", "28039652140");
+        out.assertAttributeEquals("doc_id", "28039652140");
     }
 
     @Test
     public void testPutElasticSearchOnTriggerWithFailures() throws IOException 
{
-        runner = TestRunners.newTestRunner(new 
ElasticsearchTestProcessor(true)); // simulate failures
+        runner = TestRunners.newTestRunner(new 
PutElasticsearchTestProcessor(true)); // simulate failures
         runner.setValidateExpressionUsage(false);
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
+        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
         runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
         runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
         runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
-        runner.setProperty(PutElasticsearch.INDEX, "tweet");
+        runner.setProperty(PutElasticsearch.INDEX, "doc");
         runner.setProperty(PutElasticsearch.TYPE, "status");
         runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
-        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
+        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
 
-        runner.enqueue(twitterExample, new HashMap<String, String>() {{
-            put("tweet_id", "28039652140");
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
         }});
         runner.run(1, true, true);
 
         runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_FAILURE, 1);
         final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0);
         assertNotNull(out);
-        out.assertAttributeEquals("tweet_id", "28039652140");
+        out.assertAttributeEquals("doc_id", "28039652140");
     }
 
     @Test
-    public void testPutElasticSearchOnTriggerNode() throws IOException {
-        runner = TestRunners.newTestRunner(new 
ElasticsearchTestProcessor(false)); // no failures
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE,"node");
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
+    public void testPutElasticsearchOnTriggerWithExceptions() throws 
IOException {
+        PutElasticsearchTestProcessor processor = new 
PutElasticsearchTestProcessor(false);
+        runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch");
         runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
         runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
         runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
-
-        runner.setProperty(PutElasticsearch.INDEX, "tweet");
-        runner.assertNotValid();
+        runner.setProperty(PutElasticsearch.INDEX, "doc");
         runner.setProperty(PutElasticsearch.TYPE, "status");
-        runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
-        runner.assertNotValid();
-        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
-        runner.assertValid();
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
 
-        runner.enqueue(twitterExample, new HashMap<String, String>() {{
-            put("tweet_id", "28039652141");
+        // No Node Available exception
+        processor.setExceptionToThrow(new NoNodeAvailableException("test"));
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
         }});
         runner.run(1, true, true);
 
-        runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
-        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
-        assertNotNull(out);
-        out.assertAttributeEquals("tweet_id", "28039652141");
+        runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
+        runner.clearTransferState();
+
+        // Elasticsearch Timeout exception
+        processor.setExceptionToThrow(new 
ElasticsearchTimeoutException("test"));
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652141");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
+        runner.clearTransferState();
+
+        // Receive Timeout Transport exception
+        processor.setExceptionToThrow(new 
ReceiveTimeoutTransportException(mock(StreamInput.class)));
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652142");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
+        runner.clearTransferState();
+
+        // Node Closed exception
+        processor.setExceptionToThrow(new 
NodeClosedException(mock(StreamInput.class)));
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652143");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
+        runner.clearTransferState();
+
+        // Elasticsearch Parse exception
+        processor.setExceptionToThrow(new ElasticsearchParseException("test"));
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652144");
+        }});
+        runner.run(1, true, true);
+
+        // This test generates an exception on execute(),routes to failure
+        runner.assertTransferCount(PutElasticsearch.REL_FAILURE, 1);
     }
 
+
     /**
      * A Test class that extends the processor in order to inject/mock behavior
      */
-    private static class ElasticsearchTestProcessor extends PutElasticsearch {
+    private static class PutElasticsearchTestProcessor extends 
PutElasticsearch {
         boolean responseHasFailures = false;
+        Exception exceptionToThrow = null;
 
-        public ElasticsearchTestProcessor(boolean responseHasFailures) {
+        public PutElasticsearchTestProcessor(boolean responseHasFailures) {
             this.responseHasFailures = responseHasFailures;
         }
 
+        public void setExceptionToThrow(Exception exceptionToThrow) {
+            this.exceptionToThrow = exceptionToThrow;
+        }
+
         @Override
-        @OnScheduled
-        public void createClient(ProcessContext context) throws IOException {
-            esClient = mock(Client.class);
-            BulkRequestBuilder bulkRequestBuilder = spy(new 
BulkRequestBuilder(esClient, BulkAction.INSTANCE));
-            doReturn(new 
MockBulkRequestBuilderExecutor(responseHasFailures)).when(bulkRequestBuilder).execute();
-            when(esClient.prepareBulk()).thenReturn(bulkRequestBuilder);
-
-            IndexRequestBuilder indexRequestBuilder = new 
IndexRequestBuilder(esClient, IndexAction.INSTANCE);
-            when(esClient.prepareIndex(anyString(), anyString(), 
anyString())).thenReturn(indexRequestBuilder);
+        public void createElasticsearchClient(ProcessContext context) throws 
ProcessException {
+            Client mockClient = mock(Client.class);
+            BulkRequestBuilder bulkRequestBuilder = spy(new 
BulkRequestBuilder(mockClient, BulkAction.INSTANCE));
+            if (exceptionToThrow != null) {
+                doThrow(exceptionToThrow).when(bulkRequestBuilder).execute();
+            } else {
+                doReturn(new 
MockBulkRequestBuilderExecutor(responseHasFailures)).when(bulkRequestBuilder).execute();
+            }
+            when(mockClient.prepareBulk()).thenReturn(bulkRequestBuilder);
+
+            IndexRequestBuilder indexRequestBuilder = new 
IndexRequestBuilder(mockClient, IndexAction.INSTANCE);
+            when(mockClient.prepareIndex(anyString(), anyString(), 
anyString())).thenReturn(indexRequestBuilder);
+
+            esClient.set(mockClient);
         }
 
         private static class MockBulkRequestBuilderExecutor
@@ -195,6 +245,10 @@ public class TestPutElasticsearch {
             public BulkResponse get() throws InterruptedException, 
ExecutionException {
                 BulkResponse response = mock(BulkResponse.class);
                 when(response.hasFailures()).thenReturn(responseHasFailures);
+                BulkItemResponse item = mock(BulkItemResponse.class);
+                when(item.getItemId()).thenReturn(1);
+                when(item.isFailed()).thenReturn(true);
+                when(response.getItems()).thenReturn(new 
BulkItemResponse[]{item});
                 return response;
             }
 
@@ -212,72 +266,37 @@ public class TestPutElasticsearch {
 
     /**
      * Tests basic ES functionality against a local or test ES cluster
-     * @throws IOException
      */
     @Test
     @Ignore("Comment this out if you want to run against local or test ES")
-    public void testPutElasticSearchBasic() throws IOException {
+    public void testPutElasticSearchBasic() {
         System.out.println("Starting test " + new Object() {
         }.getClass().getEnclosingMethod().getName());
         final TestRunner runner = TestRunners.newTestRunner(new 
PutElasticsearch());
         runner.setValidateExpressionUsage(false);
 
         //Local Cluster - Mac pulled from brew
-        runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, 
"transport");
         runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
         runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
         runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
         runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
 
-        runner.setProperty(PutElasticsearch.INDEX, "tweet");
+        runner.setProperty(PutElasticsearch.INDEX, "doc");
         runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
 
         runner.setProperty(PutElasticsearch.TYPE, "status");
-        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
+        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
         runner.assertValid();
 
-        runner.enqueue(twitterExample, new HashMap<String, String>() {{
-            put("tweet_id", "28039652140");
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652140");
         }});
 
 
-        runner.enqueue(twitterExample);
+        runner.enqueue(docExample);
         runner.run(1, true, true);
 
         runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
-        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
-
-    }
-
-    @Test
-    @Ignore("Comment this out if you want to run against local or test ES")
-    public void testPutElasticSearchBasicNode() throws IOException {
-        System.out.println("Starting test " + new Object() {
-        }.getClass().getEnclosingMethod().getName());
-        final TestRunner runner = TestRunners.newTestRunner(new 
PutElasticsearch());
-        runner.setValidateExpressionUsage(false);
-
-        //Local Cluster - Mac pulled from brew
-        runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, "node");
-        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
-        runner.setProperty(AbstractElasticsearchProcessor.PATH_HOME, 
"/usr/local/opt/elasticsearch");
-        runner.setProperty(PutElasticsearch.INDEX, "tweet");
-        runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
-
-        runner.setProperty(PutElasticsearch.TYPE, "status");
-        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
-        runner.assertValid();
-
-        runner.enqueue(twitterExample, new HashMap<String, String>() {{
-            put("tweet_id", "28039652141");
-        }});
-
-        runner.enqueue(twitterExample);
-        runner.run(1, true, true);
-
-        runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
-        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
-
     }
 
     @Test
@@ -289,31 +308,25 @@ public class TestPutElasticsearch {
         runner.setValidateExpressionUsage(false);
 
         //Local Cluster - Mac pulled from brew
-        runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, 
"transport");
         runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, 
"elasticsearch_brew");
         runner.setProperty(AbstractElasticsearchProcessor.HOSTS, 
"127.0.0.1:9300");
         runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
         runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, 
"5s");
-        runner.setProperty(PutElasticsearch.INDEX, "tweet");
+        runner.setProperty(PutElasticsearch.INDEX, "doc");
         runner.setProperty(PutElasticsearch.BATCH_SIZE, "100");
 
         runner.setProperty(PutElasticsearch.TYPE, "status");
-        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
+        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
         runner.assertValid();
 
 
-        JsonParser parser = new JsonParser();
-        JsonObject json;
-        String message = convertStreamToString(twitterExample);
+        String message = convertStreamToString(docExample);
         for (int i = 0; i < 100; i++) {
 
-            json = parser.parse(message).getAsJsonObject();
-            String id = json.get("id").getAsString();
-            long newId = Long.parseLong(id) + i;
+            long newId = 28039652140L + i;
             final String newStrId = Long.toString(newId);
-            //json.addProperty("id", newId);
             runner.enqueue(message.getBytes(), new HashMap<String, String>() {{
-                put("tweet_id", newStrId);
+                put("doc_id", newStrId);
             }});
 
         }
@@ -321,8 +334,6 @@ public class TestPutElasticsearch {
         runner.run();
 
         runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 
100);
-        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
-
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/75af3a2e/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json
new file mode 100644
index 0000000..014a66c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "created_at": "Thu Jan 21 16:02:46 +0000 2016",
+  "text": "This is a test document from a mock social media service",
+  "contributors": null,
+  "id": 28039652140,
+  "shares": null,
+  "geographic_location": null,
+  "userinfo": {
+    "name": "Not A. Person",
+    "location": "Orlando, FL",
+    "created_at": "Fri Oct 24 23:22:09 +0000 2008",
+    "follow_count": 1,
+    "url": "http://not.a.real.site";,
+    "id": 16958875,
+    "lang": "en",
+    "time_zone": "Mountain Time (US & Canada)",
+    "description": "I'm a test person.",
+    "following_count": 71,
+    "screen_name": "Nobody"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/75af3a2e/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json
deleted file mode 100644
index 7375be6..0000000
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json
+++ /dev/null
@@ -1,83 +0,0 @@
-
-{
-  "coordinates": null,
-  "created_at": "Thu Oct 21 16:02:46 +0000 2010",
-  "favorited": false,
-  "truncated": false,
-  "id_str": "28039652140",
-  "entities": {
-    "urls": [
-      {
-        "expanded_url": null,
-        "url": "http://gnip.com/success_stories";,
-        "indices": [
-          69,
-          100
-        ]
-      }
-    ],
-    "hashtags": [
-
-    ],
-    "user_mentions": [
-      {
-        "name": "Gnip, Inc.",
-        "id_str": "16958875",
-        "id": 16958875,
-        "indices": [
-          25,
-          30
-        ],
-        "screen_name": "gnip"
-      }
-    ]
-  },
-  "in_reply_to_user_id_str": null,
-  "text": "what we've been up to at @gnip -- delivering data to happy 
customers http://gnip.com/success_stories";,
-  "contributors": null,
-  "id": 28039652140,
-  "retweet_count": null,
-  "in_reply_to_status_id_str": null,
-  "geo": null,
-  "retweeted": false,
-  "in_reply_to_user_id": null,
-  "user": {
-    "profile_sidebar_border_color": "C0DEED",
-    "name": "Gnip, Inc.",
-    "profile_sidebar_fill_color": "DDEEF6",
-    "profile_background_tile": false,
-    "profile_image_url": 
"http://a3.twimg.com/profile_images/62803643/icon_normal.png";,
-    "location": "Boulder, CO",
-    "created_at": "Fri Oct 24 23:22:09 +0000 2008",
-    "id_str": "16958875",
-    "follow_request_sent": false,
-    "profile_link_color": "0084B4",
-    "favourites_count": 1,
-    "url": "http://blog.gnip.com";,
-    "contributors_enabled": false,
-    "utc_offset": -25200,
-    "id": 16958875,
-    "profile_use_background_image": true,
-    "listed_count": 23,
-    "protected": false,
-    "lang": "en",
-    "profile_text_color": "333333",
-    "followers_count": 260,
-    "time_zone": "Mountain Time (US & Canada)",
-    "verified": false,
-    "geo_enabled": true,
-    "profile_background_color": "C0DEED",
-    "notifications": false,
-    "description": "Gnip makes it really easy for you to collect social data 
for your business.",
-    "friends_count": 71,
-    "profile_background_image_url": 
"http://s.twimg.com/a/1287010001/images/themes/theme1/bg.png";,
-    "statuses_count": 302,
-    "screen_name": "gnip",
-    "following": false,
-    "show_all_inline_media": false
-  },
-  "in_reply_to_screen_name": null,
-  "source": "web",
-  "place": null,
-  "in_reply_to_status_id": null
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/75af3a2e/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
index 8255487..31039ae 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -1,4 +1,14 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor
+license agreements. See the NOTICE file distributed with this work for 
additional
+information regarding copyright ownership. The ASF licenses this file to
+You under the Apache License, Version 2.0 (the "License"); you may not use
+this file except in compliance with the License. You may obtain a copy of
+the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+by applicable law or agreed to in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+OF ANY KIND, either express or implied. See the License for the specific
+language governing permissions and limitations under the License. -->
 <project xmlns="http://maven.apache.org/POM/4.0.0";
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
@@ -23,4 +33,14 @@
         <module>nifi-elasticsearch-processors</module>
     </modules>
 
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-elasticsearch-processors</artifactId>
+                <version>0.4.2-SNAPSHOT</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/75af3a2e/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 646ae1c..0cdc917 100644
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -52,6 +52,7 @@
         <module>nifi-riemann-bundle</module>
         <module>nifi-html-bundle</module>
         <module>nifi-scripting-bundle</module>
+        <module>nifi-elasticsearch-bundle</module>
     </modules>
     <dependencyManagement>
         <dependencies>

Reply via email to