NIFI-3087: This closes #1263. Added unit tests to PutElasticsearch(Http) to 
illustrate issue


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7fc7494b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7fc7494b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7fc7494b

Branch: refs/heads/master
Commit: 7fc7494b2174340a3caa0292b4a54859eaf16a34
Parents: d3dbac5
Author: Matt Burgess <[email protected]>
Authored: Tue Nov 22 20:07:31 2016 -0500
Committer: joewitt <[email protected]>
Committed: Wed Nov 23 08:22:34 2016 -0500

----------------------------------------------------------------------
 .../elasticsearch/TestPutElasticsearch.java     | 22 ++++++++++++++------
 .../elasticsearch/TestPutElasticsearchHttp.java | 16 +++++++-------
 2 files changed, 25 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7fc7494b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
index d7fb439..4e6a820 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
@@ -112,15 +112,19 @@ public class TestPutElasticsearch {
         
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL,
 "5s");
         runner.setProperty(PutElasticsearch.INDEX, "doc");
         runner.setProperty(PutElasticsearch.TYPE, "status");
-        runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearch.BATCH_SIZE, "2");
         runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
 
         runner.enqueue(docExample, new HashMap<String, String>() {{
             put("doc_id", "28039652140");
         }});
+        runner.enqueue(docExample, new HashMap<String, String>() {{
+            put("doc_id", "28039652141");
+        }});
         runner.run(1, true, true);
 
-        runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_FAILURE, 1);
+        runner.assertTransferCount(PutElasticsearch.REL_FAILURE, 1);
+        runner.assertTransferCount(PutElasticsearch.REL_SUCCESS, 1);
         final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0);
         assertNotNull(out);
         out.assertAttributeEquals("doc_id", "28039652140");
@@ -349,10 +353,16 @@ public class TestPutElasticsearch {
             public BulkResponse get() throws InterruptedException, 
ExecutionException {
                 BulkResponse response = mock(BulkResponse.class);
                 when(response.hasFailures()).thenReturn(responseHasFailures);
-                BulkItemResponse item = mock(BulkItemResponse.class);
-                when(item.getItemId()).thenReturn(1);
-                when(item.isFailed()).thenReturn(true);
-                when(response.getItems()).thenReturn(new 
BulkItemResponse[]{item});
+                BulkItemResponse item1 = mock(BulkItemResponse.class);
+                BulkItemResponse item2 = mock(BulkItemResponse.class);
+                when(item1.getItemId()).thenReturn(1);
+                when(item1.isFailed()).thenReturn(true);
+                BulkItemResponse.Failure failure = 
mock(BulkItemResponse.Failure.class);
+                when(failure.getMessage()).thenReturn("Bad message");
+                when(item1.getFailure()).thenReturn(failure);
+                when(item2.getItemId()).thenReturn(2);
+                when(item2.isFailed()).thenReturn(false);
+                when(response.getItems()).thenReturn(new 
BulkItemResponse[]{item1, item2});
                 return response;
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7fc7494b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
index 1172004..9ce578f 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
@@ -201,13 +201,15 @@ public class TestPutElasticsearchHttp {
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
         runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
         runner.setProperty(PutElasticsearchHttp.TYPE, "status");
-        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "2");
         runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
 
         runner.enqueue(docExample);
+        runner.enqueue(docExample);
         runner.run(1, true, true);
 
-        runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 
1);
+        runner.assertTransferCount(PutElasticsearchHttp.REL_FAILURE, 1);
+        runner.assertTransferCount(PutElasticsearchHttp.REL_SUCCESS, 1);
         final MockFlowFile out = 
runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_FAILURE).get(0);
         assertNotNull(out);
     }
@@ -308,12 +310,12 @@ public class TestPutElasticsearchHttp {
                         
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
                         
sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed 
to parse [gender]\",");
                         
sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected
 end-of-input in VALUE_STRING\\n at ");
-                        sb.append("[Source: 
org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, 
column: 39]\"}}}}");
-                    } else {
-                        
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":");
-                        sb.append(statusCode);
-                        sb.append(",\"_source\":{\"text\": \"This is a test 
document\"}}}");
+                        sb.append("[Source: 
org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, 
column: 39]\"}}}},");
                     }
+                    
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":");
+                    sb.append(statusCode);
+                    sb.append(",\"_source\":{\"text\": \"This is a test 
document\"}}}");
+
                     sb.append("]}");
                     Response mockResponse = new Response.Builder()
                             .request(realRequest)

Reply via email to