agturley commented on code in PR #10981:
URL: https://github.com/apache/nifi/pull/10981#discussion_r2908823688


##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java:
##########
@@ -547,4 +547,287 @@ void testInvalidInput() {
         final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILURE).getFirst();
         
assertTrue(flowFile.getAttribute("elasticsearch.put.error").contains("not"));
     }
+
+    // 
-------------------------------------------------------------------------
+    // NDJSON format tests
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void testNdjsonFormat() {
+        // Switch to NDJSON mode (one JSON object per line)
+        runner.setProperty(PutElasticsearchJson.INPUT_FORMAT, 
PutElasticsearchJson.FORMAT_NDJSON);
+        runner.assertValid();
+
+        // Three documents in a single FlowFile
+        final String ndjson = "{\"id\":\"1\",\"msg\":\"hello\"}\n"
+                + "{\"id\":\"2\",\"msg\":\"world\"}\n"
+                + "{\"id\":\"3\",\"msg\":\"foo\"}\n";
+
+        // Capture the operations sent to Elasticsearch
+        final int[] operationCount = {0};
+        clientService.setEvalConsumer(items -> operationCount[0] += 
items.size());
+
+        runner.enqueue(ndjson);
+        runner.run();
+
+        // All three documents indexed in one bulk call → one FlowFile through
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
+        assertEquals(3, operationCount[0], "Expected 3 index operations from 
NDJSON FlowFile");
+    }
+
+    @Test
+    void testNdjsonFormatBlankLinesIgnored() {
+        // Blank lines between NDJSON records should be silently skipped
+        runner.setProperty(PutElasticsearchJson.INPUT_FORMAT, 
PutElasticsearchJson.FORMAT_NDJSON);
+        runner.assertValid();
+
+        final String ndjsonWithBlanks = 
"\n{\"id\":\"1\",\"msg\":\"hello\"}\n\n{\"id\":\"2\",\"msg\":\"world\"}\n\n";
+
+        final int[] operationCount = {0};
+        clientService.setEvalConsumer(items -> operationCount[0] += 
items.size());
+
+        runner.enqueue(ndjsonWithBlanks);
+        runner.run();
+
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+        assertEquals(2, operationCount[0], "Expected 2 operations — blank 
lines should be ignored");
+    }
+
+    @Test
+    void testNdjsonFormatInvalidLineRoutesToFailure() {
+        // A malformed JSON line should route the whole FlowFile to failure
+        runner.setProperty(PutElasticsearchJson.INPUT_FORMAT, 
PutElasticsearchJson.FORMAT_NDJSON);
+        runner.assertValid();
+
+        runner.enqueue("{\"id\":\"1\"}\nnot-json\n{\"id\":\"3\"}");
+        runner.run();
+
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 0);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
+
+        final MockFlowFile failed = 
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_FAILURE).getFirst();
+        assertTrue(failed.getAttribute("elasticsearch.put.error") != null,

Review Comment:
   adjusted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to