This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 5fc2e6a02a5 NIFI-15681 - Enhance PutElasticsearchJson to support 
NDJSON, JSON Array, and Single JSON input formats with size-based batching 
(#10981)
5fc2e6a02a5 is described below

commit 5fc2e6a02a51312a9c63192bad1d4f43030f4fc4
Author: agturley <[email protected]>
AuthorDate: Tue Apr 7 07:15:32 2026 -0700

    NIFI-15681 - Enhance PutElasticsearchJson to support NDJSON, JSON Array, 
and Single JSON input formats with size-based batching (#10981)
---
 .../nifi/elasticsearch/IndexOperationRequest.java  | 109 ++-
 .../ElasticSearchClientServiceImpl.java            |  39 +-
 .../integration/ElasticSearchClientService_IT.java |  87 +-
 .../nifi/processors/elasticsearch/InputFormat.java |  59 ++
 .../elasticsearch/PutElasticsearchJson.java        | 955 ++++++++++++++++++---
 .../elasticsearch/PutElasticsearchRecord.java      |  12 +-
 .../elasticsearch/PutElasticsearchJsonTest.java    | 336 +++++++-
 .../TestElasticSearchProcessorMigration.java       |   3 +-
 8 files changed, 1438 insertions(+), 162 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
index b63519e5983..6e3ac3daf93 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
@@ -17,8 +17,10 @@
 
 package org.apache.nifi.elasticsearch;
 
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * A POJO that represents an "operation on an index". It should not be 
confused with just indexing documents, as it
@@ -30,25 +32,102 @@ public class IndexOperationRequest {
     private final String type;
     private final String id;
     private final Map<String, Object> fields;
+    private final byte[] rawJsonBytes;
     private final Operation operation;
     private final Map<String, Object> script;
-
     private final boolean scriptedUpsert;
     private final Map<String, Object> dynamicTemplates;
     private final Map<String, String> headerFields;
 
-    public IndexOperationRequest(final String index, final String type, final 
String id, final Map<String, Object> fields,
-                                 final Operation operation, final Map<String, 
Object> script, final boolean scriptedUpsert,
-                                 final Map<String, Object> dynamicTemplates, 
final Map<String, String> headerFields) {
-        this.index = index;
-        this.type = type;
-        this.id = id;
-        this.fields = fields;
-        this.operation = operation;
-        this.script = script;
-        this.scriptedUpsert = scriptedUpsert;
-        this.dynamicTemplates = dynamicTemplates;
-        this.headerFields = headerFields;
+    private IndexOperationRequest(final Builder builder) {
+        this.index = builder.index;
+        this.type = builder.type;
+        this.id = builder.id;
+        this.fields = builder.fields;
+        this.rawJsonBytes = builder.rawJsonBytes;
+        this.operation = builder.operation;
+        this.script = builder.script;
+        this.scriptedUpsert = builder.scriptedUpsert;
+        this.dynamicTemplates = builder.dynamicTemplates;
+        this.headerFields = builder.headerFields;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private String index;
+        private String type;
+        private String id;
+        private Map<String, Object> fields;
+        private byte[] rawJsonBytes;
+        private Operation operation;
+        private Map<String, Object> script;
+        private boolean scriptedUpsert;
+        private Map<String, Object> dynamicTemplates;
+        private Map<String, String> headerFields;
+
+        public Builder index(final String index) {
+            this.index = index;
+            return this;
+        }
+
+        public Builder type(final String type) {
+            this.type = type;
+            return this;
+        }
+
+        public Builder id(final String id) {
+            this.id = id;
+            return this;
+        }
+
+        public Builder fields(final Map<String, Object> fields) {
+            this.fields = fields;
+            return this;
+        }
+
+        public Builder rawJson(final String rawJson) {
+            this.rawJsonBytes = rawJson != null ? 
rawJson.getBytes(StandardCharsets.UTF_8) : null;
+            return this;
+        }
+
+        public Builder rawJsonBytes(final byte[] rawJsonBytes) {
+            this.rawJsonBytes = rawJsonBytes;
+            return this;
+        }
+
+        public Builder operation(final Operation operation) {
+            this.operation = operation;
+            return this;
+        }
+
+        public Builder script(final Map<String, Object> script) {
+            this.script = script;
+            return this;
+        }
+
+        public Builder scriptedUpsert(final boolean scriptedUpsert) {
+            this.scriptedUpsert = scriptedUpsert;
+            return this;
+        }
+
+        public Builder dynamicTemplates(final Map<String, Object> 
dynamicTemplates) {
+            this.dynamicTemplates = dynamicTemplates;
+            return this;
+        }
+
+        public Builder headerFields(final Map<String, String> headerFields) {
+            this.headerFields = headerFields;
+            return this;
+        }
+
+        public IndexOperationRequest build() {
+            Objects.requireNonNull(index, "Index required");
+            Objects.requireNonNull(operation, "Operation required");
+            return new IndexOperationRequest(this);
+        }
     }
 
     public String getIndex() {
@@ -67,6 +146,10 @@ public class IndexOperationRequest {
         return fields;
     }
 
+    public byte[] getRawJsonBytes() {
+        return rawJsonBytes;
+    }
+
     public Operation getOperation() {
         return operation;
     }
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index dbd22cded5e..4b9aa9e6216 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -27,6 +27,7 @@ import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
+import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.entity.ContentType;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.message.BasicHeader;
@@ -68,6 +69,7 @@ import org.elasticsearch.client.sniff.Sniffer;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.MalformedURLException;
 import java.net.Proxy;
 import java.net.URI;
@@ -202,7 +204,6 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
     private void createObjectMapper(final ConfigurationContext context) {
         mapper = new ObjectMapper();
         if 
(ALWAYS_SUPPRESS.getValue().equals(context.getProperty(SUPPRESS_NULLS).getValue()))
 {
-            mapper.setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL);
             mapper.setDefaultPropertyInclusion(JsonInclude.Include.NON_EMPTY);
         }
         prettyPrintWriter = mapper.writerWithDefaultPrettyPrinter();
@@ -624,6 +625,19 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
         return bulk(Collections.singletonList(operation), 
elasticsearchRequestOptions);
     }
 
+    /**
+     * ByteArrayOutputStream subclass that exposes the internal buffer without 
a defensive copy,
+     * allowing zero-copy construction of ByteArrayEntity.
+     */
+    private static final class ExposedByteArrayOutputStream extends 
ByteArrayOutputStream {
+        byte[] buf() {
+            return buf;
+        }
+        int count() {
+            return count;
+        }
+    }
+
     private String flatten(final String str) {
         return str.replaceAll("[\\n\\r]", "\\\\n");
     }
@@ -656,13 +670,18 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
         return 
flatten(mapper.writeValueAsString(Collections.singletonMap(operation, 
operationBody)));
     }
 
-    protected void buildRequest(final IndexOperationRequest request, final 
StringBuilder builder) throws JsonProcessingException {
+    protected void buildRequest(final IndexOperationRequest request, final 
OutputStream out) throws IOException {
         final String header = buildBulkHeader(request);
-        builder.append(header).append("\n");
+        out.write(header.getBytes(StandardCharsets.UTF_8));
+        out.write('\n');
         switch (request.getOperation()) {
             case Index, Create:
-                final String indexDocument = 
mapper.writeValueAsString(request.getFields());
-                builder.append(indexDocument).append("\n");
+                if (request.getRawJsonBytes() != null) {
+                    out.write(request.getRawJsonBytes());
+                } else {
+                    mapper.writeValue(out, request.getFields());
+                }
+                out.write('\n');
                 break;
             case Update, Upsert:
                 final Map<String, Object> updateBody = new HashMap<>(2, 1);
@@ -678,9 +697,9 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
                         updateBody.put("doc_as_upsert", true);
                     }
                 }
-
                 final String update = 
flatten(mapper.writeValueAsString(updateBody)).trim();
-                builder.append(update).append("\n");
+                out.write(update.getBytes(StandardCharsets.UTF_8));
+                out.write('\n');
                 break;
             case Delete:
                 // nothing to do for Delete operations, it just needs the 
header
@@ -691,15 +710,15 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
     @Override
     public IndexOperationResponse bulk(final List<IndexOperationRequest> 
operations, final ElasticsearchRequestOptions elasticsearchRequestOptions) {
         try {
-            final StringBuilder payload = new StringBuilder();
+            final ExposedByteArrayOutputStream payload = new 
ExposedByteArrayOutputStream();
             for (final IndexOperationRequest or : operations) {
                 buildRequest(or, payload);
             }
 
             if (getLogger().isDebugEnabled()) {
-                getLogger().debug("{}", payload);
+                getLogger().debug("{}", 
payload.toString(StandardCharsets.UTF_8));
             }
-            final HttpEntity entity = new NStringEntity(payload.toString(), 
ContentType.APPLICATION_JSON);
+            final HttpEntity entity = new ByteArrayEntity(payload.buf(), 0, 
payload.count(), ContentType.APPLICATION_JSON);
             final StopWatch watch = new StopWatch();
             watch.start();
             final Response response = performRequest("POST", "/_bulk", 
elasticsearchRequestOptions, entity);
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
index 88e71407656..da85c07056d 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
@@ -599,7 +599,8 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
             assertNotNull(doc);
         } finally {
             // replace the deleted doc
-            service.add(new IndexOperationRequest(INDEX, type, "1", 
originalDoc, IndexOperationRequest.Operation.Index, null, false, null, null),
+            service.add(IndexOperationRequest.builder()
+                            
.index(INDEX).type(type).id("1").fields(originalDoc).operation(IndexOperationRequest.Operation.Index).build(),
                     new ElasticsearchRequestOptions(null, Map.of("Accept", 
"application/json")));
             waitForIndexRefresh(); // (affects later tests using _search or 
_bulk)
         }
@@ -719,7 +720,8 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
         suppressNulls(false);
         IndexOperationResponse response = service.bulk(
                 Collections.singletonList(
-                        new IndexOperationRequest("nulls", type, "1", doc, 
IndexOperationRequest.Operation.Index, null, false, null, null)
+                        IndexOperationRequest.builder()
+                                
.index("nulls").type(type).id("1").fields(doc).operation(IndexOperationRequest.Operation.Index).build()
                 ),
                 new ElasticsearchRequestOptions(null, Map.of("Accept", 
"application/json"))
         );
@@ -731,7 +733,8 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
 
         // suppress nulls
         suppressNulls(true);
-        response = service.bulk(Collections.singletonList(new 
IndexOperationRequest("nulls", type, "2", doc, 
IndexOperationRequest.Operation.Index, null, false, null, null)), null);
+        response = 
service.bulk(Collections.singletonList(IndexOperationRequest.builder()
+                
.index("nulls").type(type).id("2").fields(doc).operation(IndexOperationRequest.Operation.Index).build()),
 null);
         assertNotNull(response);
         waitForIndexRefresh();
 
@@ -757,12 +760,12 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
         final List<IndexOperationRequest> payload = new ArrayList<>();
         for (int x = 0; x < 20; x++) {
             final String index = x % 2 == 0 ? "bulk_a" : "bulk_b";
-            payload.add(new IndexOperationRequest(index, type, 
String.valueOf(x), Map.of("msg", "test"),
-                    IndexOperationRequest.Operation.Index, null, false, null, 
null));
+            payload.add(IndexOperationRequest.builder()
+                    
.index(index).type(type).id(String.valueOf(x)).fields(Map.of("msg", 
"test")).operation(IndexOperationRequest.Operation.Index).build());
         }
         for (int x = 0; x < 5; x++) {
-            payload.add(new IndexOperationRequest("bulk_c", type, 
String.valueOf(x), Map.of("msg", "test"),
-                    IndexOperationRequest.Operation.Index, null, false, null, 
null));
+            payload.add(IndexOperationRequest.builder()
+                    
.index("bulk_c").type(type).id(String.valueOf(x)).fields(Map.of("msg", 
"test")).operation(IndexOperationRequest.Operation.Index).build());
         }
         final IndexOperationResponse response = service.bulk(payload, new 
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
         assertNotNull(response);
@@ -794,12 +797,14 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
         final List<IndexOperationRequest> payload = new ArrayList<>();
         for (int x = 0; x < 20; x++) {
             final String index = x % 2 == 0 ? "bulk_a" : "bulk_b";
-            payload.add(new IndexOperationRequest(index, type, 
String.valueOf(x), new MapBuilder().of("msg", "test").build(),
-                    IndexOperationRequest.Operation.Index, null, false, null, 
Collections.singletonMap("retry_on_conflict", "3")));
+            payload.add(IndexOperationRequest.builder()
+                    .index(index).type(type).id(String.valueOf(x)).fields(new 
MapBuilder().of("msg", "test").build())
+                    
.operation(IndexOperationRequest.Operation.Index).headerFields(Collections.singletonMap("retry_on_conflict",
 "3")).build());
         }
         for (int x = 0; x < 5; x++) {
-            payload.add(new IndexOperationRequest("bulk_c", type, 
String.valueOf(x), new MapBuilder().of("msg", "test").build(),
-                    IndexOperationRequest.Operation.Index, null, false, null, 
null));
+            payload.add(IndexOperationRequest.builder()
+                    
.index("bulk_c").type(type).id(String.valueOf(x)).fields(new 
MapBuilder().of("msg", "test").build())
+                    .operation(IndexOperationRequest.Operation.Index).build());
         }
         final IndexOperationResponse response = service.bulk(payload, new 
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
         assertNotNull(response);
@@ -827,8 +832,9 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
 
     @Test
     void testUnknownBulkHeader() {
-        final IndexOperationRequest failingRequest = new 
IndexOperationRequest("bulk_c", type, "1", new MapBuilder().of("msg", 
"test").build(),
-                IndexOperationRequest.Operation.Index, null, false, null, 
Collections.singletonMap("not_exist", "true"));
+        final IndexOperationRequest failingRequest = 
IndexOperationRequest.builder()
+                .index("bulk_c").type(type).id("1").fields(new 
MapBuilder().of("msg", "test").build())
+                
.operation(IndexOperationRequest.Operation.Index).headerFields(Collections.singletonMap("not_exist",
 "true")).build();
         final ElasticsearchException ee = 
assertThrows(ElasticsearchException.class, () -> service.add(failingRequest, 
null));
         assertInstanceOf(ResponseException.class, ee.getCause());
         assertTrue(ee.getCause().getMessage().contains("Action/metadata line 
[1] contains an unknown parameter [not_exist]"));
@@ -837,8 +843,9 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
     @Test
     void testDynamicTemplates() {
         final List<IndexOperationRequest> payload = Collections.singletonList(
-                new IndexOperationRequest("dynamo", type, "1", new 
MapBuilder().of("msg", "test", "hello", "world").build(),
-                        IndexOperationRequest.Operation.Index, null, false, 
new MapBuilder().of("hello", "test_text").build(), null)
+                IndexOperationRequest.builder()
+                        .index("dynamo").type(type).id("1").fields(new 
MapBuilder().of("msg", "test", "hello", "world").build())
+                        
.operation(IndexOperationRequest.Operation.Index).dynamicTemplates(new 
MapBuilder().of("hello", "test_text").build()).build()
         );
 
         final IndexOperationResponse response = service.bulk(payload, new 
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
@@ -862,7 +869,8 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
             final Map<String, Object> doc = new HashMap<>();
             doc.put("msg", "Buongiorno, mondo");
             doc.put("counter", 1);
-            service.add(new IndexOperationRequest(INDEX, type, testId, doc, 
IndexOperationRequest.Operation.Index, null, false, null, null),
+            service.add(IndexOperationRequest.builder()
+                            
.index(INDEX).type(type).id(testId).fields(doc).operation(IndexOperationRequest.Operation.Index).build(),
                     new ElasticsearchRequestOptions(Map.of("refresh", "true"), 
null));
             Map<String, Object> result = service.get(INDEX, type, testId, 
null);
             assertEquals(doc, result, "Not the same");
@@ -872,7 +880,8 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
             final Map<String, Object> merged = new HashMap<>();
             merged.putAll(updates);
             merged.putAll(doc);
-            IndexOperationRequest request = new IndexOperationRequest(INDEX, 
type, testId, updates, IndexOperationRequest.Operation.Update, null, false, 
null, null);
+            IndexOperationRequest request = IndexOperationRequest.builder()
+                    
.index(INDEX).type(type).id(testId).fields(updates).operation(IndexOperationRequest.Operation.Update).build();
             service.add(request, new 
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
             result = service.get(INDEX, type, testId, null);
             assertTrue(result.containsKey("from"));
@@ -884,7 +893,8 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
             upsertItems.put("upsert_1", "hello");
             upsertItems.put("upsert_2", 1);
             upsertItems.put("upsert_3", true);
-            request = new IndexOperationRequest(INDEX, type, upsertedId, 
upsertItems, IndexOperationRequest.Operation.Upsert, null, false, null, null);
+            request = IndexOperationRequest.builder()
+                    
.index(INDEX).type(type).id(upsertedId).fields(upsertItems).operation(IndexOperationRequest.Operation.Upsert).build();
             service.add(request, new 
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
             result = service.get(INDEX, type, upsertedId, null);
             assertEquals(upsertItems, result);
@@ -896,13 +906,15 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
             script.put("lang", "painless");
             script.put("params", Collections.singletonMap("count", 2));
             // apply script to existing document
-            request = new IndexOperationRequest(INDEX, type, testId, 
upsertDoc, IndexOperationRequest.Operation.Upsert, script, false, null, null);
+            request = IndexOperationRequest.builder()
+                    
.index(INDEX).type(type).id(testId).fields(upsertDoc).operation(IndexOperationRequest.Operation.Upsert).script(script).build();
             service.add(request, new 
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
             result = service.get(INDEX, type, testId, new 
ElasticsearchRequestOptions());
             assertEquals(doc.get("msg"), result.get("msg"));
             assertEquals(3, result.get("counter"));
             // index document that doesn't already exist (don't apply script)
-            request = new IndexOperationRequest(INDEX, type, upsertScriptId, 
upsertDoc, IndexOperationRequest.Operation.Upsert, script, false, null, null);
+            request = IndexOperationRequest.builder()
+                    
.index(INDEX).type(type).id(upsertScriptId).fields(upsertDoc).operation(IndexOperationRequest.Operation.Upsert).script(script).build();
             service.add(request, new 
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
             result = service.get(INDEX, type, upsertScriptId, new 
ElasticsearchRequestOptions(null, null));
             assertNull(result.get("counter"));
@@ -914,31 +926,38 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
             upsertScript.put("lang", "painless");
             upsertScript.put("params", Collections.singletonMap("count", 2));
             // no script execution if doc found (without scripted_upsert)
-            request = new IndexOperationRequest(INDEX, type, scriptedUpsertId, 
emptyUpsertDoc, IndexOperationRequest.Operation.Upsert, upsertScript, false, 
null, null);
+            request = IndexOperationRequest.builder()
+                    
.index(INDEX).type(type).id(scriptedUpsertId).fields(emptyUpsertDoc).operation(IndexOperationRequest.Operation.Upsert).script(upsertScript).build();
             service.add(request, new 
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
             assertFalse(service.documentExists(INDEX, type, scriptedUpsertId, 
new ElasticsearchRequestOptions(null, Map.of("Accept", "application/json"))));
             // script execution with no doc found (with scripted_upsert) - doc 
not create, no "upsert" doc provided (empty objects suppressed)
             suppressNulls(true);
-            request = new IndexOperationRequest(INDEX, type, scriptedUpsertId, 
emptyUpsertDoc, IndexOperationRequest.Operation.Upsert, upsertScript, true, 
null, null);
+            request = IndexOperationRequest.builder()
+                    
.index(INDEX).type(type).id(scriptedUpsertId).fields(emptyUpsertDoc).operation(IndexOperationRequest.Operation.Upsert)
+                    .script(upsertScript).scriptedUpsert(true).build();
             service.add(request, new 
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
             assertFalse(service.documentExists(INDEX, type, scriptedUpsertId, 
null));
             // script execution with no doc found (with scripted_upsert) - doc 
created, empty "upsert" doc provided
             suppressNulls(false);
-            request = new IndexOperationRequest(INDEX, type, scriptedUpsertId, 
emptyUpsertDoc, IndexOperationRequest.Operation.Upsert, upsertScript, true, 
null, null);
+            request = IndexOperationRequest.builder()
+                    
.index(INDEX).type(type).id(scriptedUpsertId).fields(emptyUpsertDoc).operation(IndexOperationRequest.Operation.Upsert)
+                    .script(upsertScript).scriptedUpsert(true).build();
             service.add(request, new 
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
             result = service.get(INDEX, type, scriptedUpsertId, new 
ElasticsearchRequestOptions(null, null));
             assertEquals(2, result.get("counter"));
             // script execution with no doc found (with scripted_upsert) - doc 
updated
-            request = new IndexOperationRequest(INDEX, type, scriptedUpsertId, 
emptyUpsertDoc, IndexOperationRequest.Operation.Upsert, upsertScript, true, 
null, null);
+            request = IndexOperationRequest.builder()
+                    
.index(INDEX).type(type).id(scriptedUpsertId).fields(emptyUpsertDoc).operation(IndexOperationRequest.Operation.Upsert)
+                    .script(upsertScript).scriptedUpsert(true).build();
             service.add(request, new 
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
             result = service.get(INDEX, type, scriptedUpsertId, new 
ElasticsearchRequestOptions());
             assertEquals(4, result.get("counter"));
         } finally {
             final List<IndexOperationRequest> deletes = new ArrayList<>();
-            deletes.add(new IndexOperationRequest(INDEX, type, testId, null, 
IndexOperationRequest.Operation.Delete, null, false, null, null));
-            deletes.add(new IndexOperationRequest(INDEX, type, upsertedId, 
null, IndexOperationRequest.Operation.Delete, null, false, null, null));
-            deletes.add(new IndexOperationRequest(INDEX, type, upsertScriptId, 
null, IndexOperationRequest.Operation.Delete, null, false, null, null));
-            deletes.add(new IndexOperationRequest(INDEX, type, 
scriptedUpsertId, null, IndexOperationRequest.Operation.Delete, null, false, 
null, null));
+            
deletes.add(IndexOperationRequest.builder().index(INDEX).type(type).id(testId).operation(IndexOperationRequest.Operation.Delete).build());
+            
deletes.add(IndexOperationRequest.builder().index(INDEX).type(type).id(upsertedId).operation(IndexOperationRequest.Operation.Delete).build());
+            
deletes.add(IndexOperationRequest.builder().index(INDEX).type(type).id(upsertScriptId).operation(IndexOperationRequest.Operation.Delete).build());
+            
deletes.add(IndexOperationRequest.builder().index(INDEX).type(type).id(scriptedUpsertId).operation(IndexOperationRequest.Operation.Delete).build());
             assertFalse(service.bulk(deletes, new 
ElasticsearchRequestOptions(Map.of("refresh", "true"), null)).hasErrors());
             waitForIndexRefresh(); // wait 1s for index refresh (doesn't 
prevent GET but affects later tests using _search or _bulk)
             assertFalse(service.documentExists(INDEX, type, testId, null));
@@ -952,12 +971,12 @@ class ElasticSearchClientService_IT extends 
AbstractElasticsearch_IT {
     @Test
     void testGetBulkResponsesWithErrors() {
         final List<IndexOperationRequest> ops = Arrays.asList(
-                new IndexOperationRequest(INDEX, type, "1", new 
MapBuilder().of("msg", "one", "intField", 1).build(),
-                        IndexOperationRequest.Operation.Index, null, false, 
null, null), // OK
-                new IndexOperationRequest(INDEX, type, "2", new 
MapBuilder().of("msg", "two", "intField", 1).build(),
-                        IndexOperationRequest.Operation.Create, null, false, 
null, null), // already exists
-                new IndexOperationRequest(INDEX, type, "1", new 
MapBuilder().of("msg", "one", "intField", "notaninteger").build(),
-                        IndexOperationRequest.Operation.Index, null, false, 
null, null) // can't parse int field
+                IndexOperationRequest.builder().index(INDEX).type(type).id("1")
+                        .fields(new MapBuilder().of("msg", "one", "intField", 
1).build()).operation(IndexOperationRequest.Operation.Index).build(), // OK
+                IndexOperationRequest.builder().index(INDEX).type(type).id("2")
+                        .fields(new MapBuilder().of("msg", "two", "intField", 
1).build()).operation(IndexOperationRequest.Operation.Create).build(), // 
already exists
+                IndexOperationRequest.builder().index(INDEX).type(type).id("1")
+                        .fields(new MapBuilder().of("msg", "one", "intField", 
"notaninteger").build()).operation(IndexOperationRequest.Operation.Index).build()
 // can't parse int field
         );
         final IndexOperationResponse response = service.bulk(ops, new 
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
         assertTrue(response.hasErrors());
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/InputFormat.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/InputFormat.java
new file mode 100644
index 00000000000..06ae48c68e4
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/InputFormat.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum InputFormat implements DescribedValue {
+    NDJSON("NDJSON", "NDJSON", "One JSON object per line 
(newline-delimited)."),
+    JSON_ARRAY("JSON Array", "JSON Array", "A top-level JSON array of objects, 
streamed element-by-element for memory efficiency."),
+    SINGLE_JSON("Single JSON", "Single JSON", "The entire FlowFile is a single 
JSON document.");
+
+    private final String value;
+    private final String displayName;
+    private final String description;
+
+    InputFormat(final String value, final String displayName, final String 
description) {
+        this.value = value;
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return value;
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+
+    public static InputFormat fromValue(final String value) {
+        for (final InputFormat format : values()) {
+            if (format.value.equals(value)) {
+                return format;
+            }
+        }
+        throw new IllegalArgumentException("Unknown Input Format: " + value);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
index b52bbc4c7f2..2b10871da38 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
@@ -17,8 +17,15 @@
 
 package org.apache.nifi.processors.elasticsearch;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.commons.io.IOUtils;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
 import org.apache.nifi.annotation.behavior.DynamicProperties;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -32,6 +39,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.elasticsearch.ElasticSearchClientService;
 import org.apache.nifi.elasticsearch.ElasticsearchException;
 import org.apache.nifi.elasticsearch.ElasticsearchRequestOptions;
@@ -41,6 +49,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.migration.PropertyConfiguration;
 import org.apache.nifi.migration.RelationshipConfiguration;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
@@ -48,20 +57,33 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StringUtils;
 
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @SupportsBatching
-@Tags({"json", "elasticsearch", "elasticsearch7", "elasticsearch8", 
"elasticsearch9", "put", "index"})
-@CapabilityDescription("An Elasticsearch put processor that uses the official 
Elastic REST client libraries. " +
-        "Each FlowFile is treated as a document to be sent to the 
Elasticsearch _bulk API. Multiple FlowFiles can be batched together into each 
Request sent to Elasticsearch.")
+@Tags({"ndjson", "json", "json array", "elasticsearch", "elasticsearch7", 
"elasticsearch8", "elasticsearch9", "put", "index"})
+@CapabilityDescription("An Elasticsearch put processor that uses the official 
Elastic REST client libraries to write JSON data. " +
+        "Supports three input formats selectable via the \"Input Content 
Format\" property: " +
+        "NDJSON (one JSON object per line), JSON Array (a top-level array of 
objects, streamed for memory efficiency), " +
+        "and Single JSON (the entire FlowFile is one document). " +
+        "FlowFiles are accumulated up to the configured Max Batch Size and 
flushed to Elasticsearch in _bulk API requests. " +
+        "Large files that exceed the batch size are automatically split into 
multiple _bulk requests.")
 @WritesAttributes({
         @WritesAttribute(attribute = "elasticsearch.put.error",
                 description = "The error message if there is an issue parsing 
the FlowFile, sending the parsed document to Elasticsearch or parsing the 
Elasticsearch response"),
@@ -95,17 +117,9 @@ import java.util.Set;
 })
 @SystemResourceConsideration(
         resource = SystemResource.MEMORY,
-        description = "The Batch of FlowFiles will be stored in memory until 
the bulk operation is performed.")
+        description = "At most one batch's worth of data will be buffered in 
memory at a time. " +
+                "The maximum memory usage is bounded by the Max Batch Size 
property.")
 public class PutElasticsearchJson extends AbstractPutElasticsearch {
-    static final PropertyDescriptor ID_ATTRIBUTE = new 
PropertyDescriptor.Builder()
-            .name("Identifier Attribute")
-            .description("The name of the FlowFile attribute containing the 
identifier for the document. If the Index Operation is \"index\", "
-                    + "this property may be left empty or evaluate to an empty 
value, in which case the document's identifier will be "
-                    + "auto-generated by Elasticsearch. For all other Index 
Operations, the attribute must evaluate to a non-empty value.")
-            .required(false)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
-            .build();
 
     static final PropertyDescriptor SCRIPT = new PropertyDescriptor.Builder()
             .name("Script")
@@ -117,13 +131,14 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
 
     static final PropertyDescriptor SCRIPTED_UPSERT = new 
PropertyDescriptor.Builder()
             .name("Scripted Upsert")
-            .description("Whether to add the scripted_upsert flag to the 
Upsert Operation. " +
+            .description("Whether to add the scripted_upsert flag to the 
Upsert operation. " +
                     "If true, forces Elasticsearch to execute the Script 
whether or not the document exists, defaults to false. " +
-                    "If the Upsert Document provided (from FlowFile content) 
will be empty, but sure to set the " +
-                    CLIENT_SERVICE.getDisplayName() + " controller service's " 
+ ElasticSearchClientService.SUPPRESS_NULLS.getDisplayName() +
-                    " to " + 
ElasticSearchClientService.NEVER_SUPPRESS.getDisplayName() + " or no \"upsert\" 
doc will be, " +
-                    "included in the request to Elasticsearch and the 
operation will not create a new document for the script " +
-                    "to execute against, resulting in a \"not_found\" error")
+                    "If the upsert document will be empty, ensure null 
suppression is disabled so that an empty \"upsert\" doc " +
+                    "is included in the request — otherwise Elasticsearch will 
not create a new document for the script to " +
+                    "execute against, resulting in a \"not_found\" error. " +
+                    "For Single JSON mode, set the " + 
CLIENT_SERVICE.getDisplayName() + " controller service's " +
+                    ElasticSearchClientService.SUPPRESS_NULLS.getDisplayName() 
+ " to " + ElasticSearchClientService.NEVER_SUPPRESS.getDisplayName() + ". " +
+                    "For NDJSON and JSON Array modes, set the Suppress Nulls 
property on this processor to Never Suppress.")
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .defaultValue("false")
@@ -145,39 +160,171 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
         .required(true)
         .build();
 
+    static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Batch Size")
+            .description("""
+                    The maximum amount of data to send in a single 
Elasticsearch _bulk API request. \
+                    For NDJSON and JSON Array modes, FlowFiles are accumulated 
until this threshold is reached, then flushed. \
+                    For Single JSON mode, this acts as a size-based safety 
limit: if the accumulated FlowFiles exceed this size \
+                    before Max FlowFiles Per Batch is reached, the request is 
flushed early. \
+                    Elasticsearch recommends 5-15 MB per bulk request for 
optimal performance. \
+                    To disable the size-based limit, check "Set Empty String".\
+                    """)
+            .defaultValue("10 MB")
+            .addValidator((subject, input, context) -> 
StringUtils.isBlank(input)
+                    ? new 
ValidationResult.Builder().subject(subject).valid(true).build()
+                    : StandardValidators.DATA_SIZE_VALIDATOR.validate(subject, 
input, context))
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor INPUT_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("Input Content Format")
+            .description("""
+                    The format of the JSON content in each FlowFile. \
+                    NDJSON: one JSON object per line (newline-delimited). \
+                    JSON Array: a top-level JSON array of objects, streamed 
element-by-element for memory efficiency. \
+                    Single JSON: the entire FlowFile is a single JSON 
document.\
+                    """)
+            .allowableValues(InputFormat.class)
+            .defaultValue(InputFormat.SINGLE_JSON.getValue())
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(AbstractPutElasticsearch.BATCH_SIZE)
+            .name("Max FlowFiles Per Batch")
+            .displayName("Max FlowFiles Per Batch")
+            .description("""
+                    The maximum number of FlowFiles to include in a single 
Elasticsearch _bulk API request. \
+                    If the accumulated FlowFiles exceed Max Batch Size before 
this count is reached, the request will be flushed early.\
+                    """)
+            .dependsOn(INPUT_FORMAT, InputFormat.SINGLE_JSON)
+            .build();
+
+    static final PropertyDescriptor SUPPRESS_NULLS = new 
PropertyDescriptor.Builder()
+            .name("Suppress Nulls")
+            .description("""
+                    When set to Always Suppress, null and empty values are 
removed from documents before they are sent to Elasticsearch.
+                    This setting applies to NDJSON and JSON Array formats for 
Index and Create operations only. \
+                    For Single JSON, configure null suppression on the 
controller service instead.
+                    Performance note: for JSON Array the impact is negligible 
since documents are already being parsed. \
+                    For NDJSON, each line must be parsed and re-serialized 
when suppression is enabled, \
+                    which adds overhead compared to the default behaviour of 
passing lines through as raw bytes.\
+                    """)
+            .allowableValues(ElasticSearchClientService.NEVER_SUPPRESS, 
ElasticSearchClientService.ALWAYS_SUPPRESS)
+            .defaultValue(ElasticSearchClientService.NEVER_SUPPRESS)
+            .required(true)
+            .dependsOn(INPUT_FORMAT, InputFormat.NDJSON, 
InputFormat.JSON_ARRAY)
+            .build();
+
+    static final PropertyDescriptor ID_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("Identifier Attribute")
+            .description("""
+                    The name of the FlowFile attribute containing the 
identifier for the document. \
+                    If the Index Operation is "index", this property may be 
left empty or evaluate to an empty value, \
+                    in which case the document's identifier will be 
auto-generated by Elasticsearch. \
+                    For all other Index Operations, the attribute must 
evaluate to a non-empty value.\
+                    """)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+            .dependsOn(INPUT_FORMAT, InputFormat.SINGLE_JSON)
+            .build();
+
+    static final PropertyDescriptor IDENTIFIER_FIELD = new 
PropertyDescriptor.Builder()
+            .name("Identifier Field")
+            .description("""
+                    The name of the field within each document to use as the 
Elasticsearch document ID. \
+                    If the field is not present in a document or this property 
is left blank, no document ID is set \
+                    and Elasticsearch will auto-generate one.\
+                    """)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(INPUT_FORMAT, InputFormat.NDJSON, 
InputFormat.JSON_ARRAY)
+            .build();
+
+    static final Relationship REL_BULK_REQUEST = new Relationship.Builder()
+            .name("bulk_request")
+            .description("When \"Output Bulk Request\" is enabled, the raw 
Elasticsearch _bulk API request body is written " +
+                    "to this relationship as a FlowFile for inspection or 
debugging.")
+            .build();
+
+    static final PropertyDescriptor OUTPUT_BULK_REQUEST = new 
PropertyDescriptor.Builder()
+            .name("Output Bulk Request")
+            .description("If enabled, each Elasticsearch _bulk request body is 
written as a FlowFile to the \"" +
+                    REL_BULK_REQUEST.getName() + "\" relationship. Useful for 
debugging. " +
+                    "Each FlowFile contains the full NDJSON body exactly as 
sent to Elasticsearch.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
     static final List<PropertyDescriptor> DESCRIPTORS = List.of(
-            ID_ATTRIBUTE,
             INDEX_OP,
             INDEX,
             TYPE,
             SCRIPT,
             SCRIPTED_UPSERT,
             DYNAMIC_TEMPLATES,
+            INPUT_FORMAT,
             BATCH_SIZE,
+            MAX_BATCH_SIZE,
+            SUPPRESS_NULLS,
+            ID_ATTRIBUTE,
+            IDENTIFIER_FIELD,
             CHARSET,
             MAX_JSON_FIELD_STRING_LENGTH,
             CLIENT_SERVICE,
             LOG_ERROR_RESPONSES,
             OUTPUT_ERROR_RESPONSES,
+            OUTPUT_BULK_REQUEST,
             NOT_FOUND_IS_SUCCESSFUL
     );
+
     static final Set<Relationship> BASE_RELATIONSHIPS =
             Set.of(REL_ORIGINAL, REL_FAILURE, REL_RETRY, REL_SUCCESSFUL, 
REL_ERRORS);
 
+    private static final int READER_BUFFER_SIZE = 65536;
+
+    private final AtomicBoolean bulkRequestOutputEnabled = new 
AtomicBoolean(false);
+    private boolean outputBulkRequest;
+    private ObjectReader mapReader;
+    private ObjectWriter suppressingWriter;
+
     @Override
     Set<Relationship> getBaseRelationships() {
         return BASE_RELATIONSHIPS;
     }
 
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> rels = new HashSet<>(super.getRelationships());
+        if (bulkRequestOutputEnabled.get()) {
+            rels.add(REL_BULK_REQUEST);
+        }
+        return rels;
+    }
+
     @Override
     public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return DESCRIPTORS;
     }
 
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        super.onPropertyModified(descriptor, oldValue, newValue);
+        if (OUTPUT_BULK_REQUEST.equals(descriptor)) {
+            bulkRequestOutputEnabled.set(Boolean.parseBoolean(newValue));
+        }
+    }
+
     @Override
     public void migrateProperties(final PropertyConfiguration config) {
         super.migrateProperties(config);
 
+        // Migrate legacy property names from PutElasticsearchJson
         config.removeProperty("put-es-json-error-documents");
         config.renameProperty("put-es-json-id-attr", ID_ATTRIBUTE.getName());
         config.renameProperty("put-es-json-script", SCRIPT.getName());
@@ -185,12 +332,31 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
         config.renameProperty("put-es-json-dynamic_templates", 
DYNAMIC_TEMPLATES.getName());
         config.renameProperty("put-es-json-charset", CHARSET.getName());
         config.renameProperty("put-es-json-not_found-is-error", 
AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName());
+
+        // If INPUT_FORMAT was not explicitly set, this flow was migrated from 
PutElasticsearchJson — default to Single JSON.
+        // Set this before migrating BATCH_SIZE so its dependsOn condition is 
satisfied when NiFi processes the property.
+        if (!config.hasProperty(INPUT_FORMAT.getName())) {
+            config.setProperty(INPUT_FORMAT.getName(), 
InputFormat.SINGLE_JSON.getValue());
+        }
+
+        // Migrate "Batch Size" (from PutElasticsearchJson) to the new name 
used in this processor.
+        // INPUT_FORMAT is set first above so its dependsOn condition is 
satisfied when NiFi processes BATCH_SIZE.
+        config.renameProperty(AbstractPutElasticsearch.BATCH_SIZE.getName(), 
BATCH_SIZE.getName());
+
+        // MAX_BATCH_SIZE is a new property — existing configurations had no 
byte-based limit.
+        // Preserve that behavior on upgrade by leaving the property blank 
(unbounded).
+        // The custom validator on MAX_BATCH_SIZE accepts blank values, so 
this will not be
+        // overwritten by the default of 10 MB. New processor instances get 
the 10 MB default.
+        if (!config.hasProperty(MAX_BATCH_SIZE.getName())) {
+            config.setProperty(MAX_BATCH_SIZE.getName(), "");
+        }
     }
 
     @Override
     public void migrateRelationships(final RelationshipConfiguration config) {
         super.migrateRelationships(config);
 
+        // PutElasticsearchJson used "success" before it was renamed to 
"original"
         config.renameRelationship("success", 
AbstractPutElasticsearch.REL_ORIGINAL.getName());
     }
 
@@ -198,130 +364,637 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
         super.onScheduled(context);
-
         this.notFoundIsSuccessful = 
context.getProperty(NOT_FOUND_IS_SUCCESSFUL).asBoolean();
+        this.outputBulkRequest = 
context.getProperty(OUTPUT_BULK_REQUEST).asBoolean();
+        this.bulkRequestOutputEnabled.set(this.outputBulkRequest);
+        this.mapReader = mapper.readerFor(new TypeReference<Map<String, 
Object>>() { });
+        final InputFormat inputFormat = 
InputFormat.fromValue(context.getProperty(INPUT_FORMAT).getValue());
+        if (inputFormat != InputFormat.SINGLE_JSON
+                && 
ElasticSearchClientService.ALWAYS_SUPPRESS.getValue().equals(context.getProperty(SUPPRESS_NULLS).getValue()))
 {
+            final ObjectMapper suppressingMapper = mapper.copy()
+                    
.setDefaultPropertyInclusion(JsonInclude.Include.NON_EMPTY);
+            this.suppressingWriter = suppressingMapper.writer();
+        } else {
+            this.suppressingWriter = null;
+        }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        final int batchSize = 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-
-        final List<FlowFile> flowFiles = session.get(batchSize);
-        if (flowFiles.isEmpty()) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
             return;
         }
+        final String maxBatchSizeStr = 
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().getValue();
+        final long maxBatchBytes = StringUtils.isNotBlank(maxBatchSizeStr)
+                ? 
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue()
+                : Long.MAX_VALUE;
+        final InputFormat inputFormat = 
InputFormat.fromValue(context.getProperty(INPUT_FORMAT).getValue());
+        final String idAttribute = inputFormat == InputFormat.SINGLE_JSON
+                ? context.getProperty(ID_ATTRIBUTE).getValue()
+                : null;
+        final String documentIdField = inputFormat != InputFormat.SINGLE_JSON
+                ? 
context.getProperty(IDENTIFIER_FIELD).evaluateAttributeExpressions().getValue()
+                : null;
+        final int batchSize = InputFormat.SINGLE_JSON == inputFormat
+                ? 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger()
+                : Integer.MAX_VALUE;
+        int flowFilesProcessed = 0;
+
+        // Tracks all FlowFiles that were successfully parsed and submitted 
(even partially)
+        final Set<FlowFile> allProcessedFlowFiles = new LinkedHashSet<>();
+        // Tracks FlowFiles that had at least one Elasticsearch document error
+        final Set<FlowFile> errorFlowFiles = new LinkedHashSet<>();
+        // Deferred bulk-error attributes: applied after each FlowFile's 
InputStream is closed
+        final Map<FlowFile, List<Map<String, Object>>> pendingBulkErrors = new 
HashMap<>();
+        // Failed local record indices per FlowFile — O(error count) memory; 
used at finalization
+        // to re-read FlowFiles once and reconstruct error/success content 
without buffering all bytes
+        final Map<FlowFile, Set<Integer>> pendingErrorRecordIndices = new 
LinkedHashMap<>();
+
+        // Current chunk accumulation — operationFlowFiles and 
operationRecordIndices are parallel to operations (same index)
+        final List<FlowFile> operationFlowFiles = new ArrayList<>();
+        final List<IndexOperationRequest> operations = new ArrayList<>();
+        final List<Integer> operationRecordIndices = new ArrayList<>();
+        long totalBytesAccumulated = 0;
+        long chunkBytes = 0;
+
+        while (flowFile != null) {
+            final String indexOp = 
context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
+            final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
+            final String type = 
context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
+            final String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
+            final String flowFileIdAttribute = 
StringUtils.isNotBlank(idAttribute) ? flowFile.getAttribute(idAttribute) : null;
+
+            final Map<String, Object> scriptMap = getMapFromAttribute(SCRIPT, 
context, flowFile);
+            final boolean scriptedUpsert = 
context.getProperty(SCRIPTED_UPSERT).evaluateAttributeExpressions(flowFile).asBoolean();
+            final Map<String, Object> dynamicTemplatesMap = 
getMapFromAttribute(DYNAMIC_TEMPLATES, context, flowFile);
+            final Map<String, String> dynamicProperties = 
getRequestParametersFromDynamicProperties(context, flowFile);
+            final Map<String, String> bulkHeaderFields = 
getBulkHeaderParameters(dynamicProperties);
+
+            boolean parseError = false;
+            try {
+                final IndexOperationRequest.Operation o = 
IndexOperationRequest.Operation.forValue(indexOp);
+                if (InputFormat.NDJSON == inputFormat) {
+                    // NDJSON: each non-blank line is one JSON document.
+                    // Index/Create operations pass raw UTF-8 bytes directly 
to avoid Map allocation.
+                    // Update/Delete/Upsert parse into a Map so the bulk 
serializer can wrap the payload.
+                    int localRecordIndex = 0;
+                    try (final BufferedReader reader = new BufferedReader(
+                            new InputStreamReader(session.read(flowFile), 
charset), READER_BUFFER_SIZE)) {
+                        String line;
+                        while ((line = reader.readLine()) != null) {
+                            final String trimmedLine = line.trim();
+                            if (trimmedLine.isEmpty()) {
+                                continue;
+                            }
+                            final IndexOperationRequest opRequest;
+                            final long docBytes;
+                            if (o == IndexOperationRequest.Operation.Index || 
o == IndexOperationRequest.Operation.Create) {
+                                final String id = extractId(trimmedLine, 
documentIdField, flowFileIdAttribute);
+                                final byte[] rawJsonBytes;
+                                if (suppressingWriter != null) {
+                                    // Parse to Map so NON_NULL/NON_EMPTY 
inclusion filters apply during serialization.
+                                    // JsonNode tree serialization bypasses 
JsonInclude filters.
+                                    rawJsonBytes = 
suppressingWriter.writeValueAsBytes(mapReader.readValue(trimmedLine));
+                                } else {
+                                    rawJsonBytes = 
trimmedLine.getBytes(StandardCharsets.UTF_8);
+                                }
+                                opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .rawJsonBytes(rawJsonBytes)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                docBytes = rawJsonBytes.length;
+                            } else {
+                                final Map<String, Object> contentMap = 
mapReader.readValue(trimmedLine);
+                                final String id = resolveId(contentMap, 
documentIdField, flowFileIdAttribute);
+                                opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .fields(contentMap)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                docBytes = trimmedLine.length();
+                            }
+                            operations.add(opRequest);
+                            operationFlowFiles.add(flowFile);
+                            operationRecordIndices.add(localRecordIndex++);
+                            chunkBytes += docBytes;
+                            totalBytesAccumulated += docBytes;
+                            if (chunkBytes >= maxBatchBytes) {
+                                flushChunk(operations, operationFlowFiles, 
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors, 
pendingErrorRecordIndices, context, session);
+                                operations.clear();
+                                operationFlowFiles.clear();
+                                operationRecordIndices.clear();
+                                chunkBytes = 0;
+                            }
+                        }
+                    }
+                } else if (InputFormat.JSON_ARRAY == inputFormat) {
+                    // JSON Array: the FlowFile may contain one or more 
top-level JSON arrays.
+                    // Arrays are read sequentially; elements within each 
array are streamed one at a
+                    // time via JsonParser to avoid loading the entire content 
into memory.
+                    // Each element is re-serialized to compact bytes for 
Index/Create,
+                    // or parsed into a Map for Update/Delete/Upsert.
+                    int localRecordIndex = 0;
+                    try (final InputStreamReader isr = new 
InputStreamReader(session.read(flowFile), charset);
+                         final JsonParser parser = 
mapper.getFactory().createParser(isr)) {
+                        JsonToken outerToken;
+                        while ((outerToken = parser.nextToken()) != null) {
+                            if (outerToken != JsonToken.START_ARRAY) {
+                                throw new IOException("Expected a JSON array 
but found: " + outerToken);
+                            }
+                            JsonToken token;
+                            while ((token = parser.nextToken()) != 
JsonToken.END_ARRAY) {
+                                if (token == null) {
+                                    throw new IOException("Malformed JSON 
Array: reached end of stream before the closing ']'. " +
+                                            "Verify the FlowFile content is a 
complete, valid JSON array.");
+                                }
+                                final long startOffset = 
parser.currentTokenLocation().getCharOffset();
+                                final IndexOperationRequest opRequest;
+                                if (o == IndexOperationRequest.Operation.Index 
|| o == IndexOperationRequest.Operation.Create) {
+                                    final long docBytes;
+                                    final byte[] rawJsonBytes;
+                                    final String id;
+                                    if (suppressingWriter != null) {
+                                        // Parse directly to Map so 
NON_NULL/NON_EMPTY inclusion filters apply during
+                                        // serialization. JsonNode tree 
serialization bypasses JsonInclude filters,
+                                        // and convertValue(node, Map) adds an 
extra serialization cycle.
+                                        final Map<String, Object> contentMap = 
mapReader.readValue(parser);
+                                        docBytes = Math.max(1, 
parser.currentLocation().getCharOffset() - startOffset);
+                                        rawJsonBytes = 
suppressingWriter.writeValueAsBytes(contentMap);
+                                        id = resolveId(contentMap, 
documentIdField, flowFileIdAttribute);
+                                    } else {
+                                        final JsonNode node = 
mapper.readTree(parser);
+                                        docBytes = Math.max(1, 
parser.currentLocation().getCharOffset() - startOffset);
+                                        rawJsonBytes = 
mapper.writeValueAsBytes(node);
+                                        id = extractId(node, documentIdField, 
flowFileIdAttribute);
+                                    }
+                                    opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .rawJsonBytes(rawJsonBytes)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                    chunkBytes += docBytes;
+                                    totalBytesAccumulated += docBytes;
+                                } else {
+                                    final Map<String, Object> contentMap = 
mapReader.readValue(parser);
+                                    final long docBytes = Math.max(1, 
parser.currentLocation().getCharOffset() - startOffset);
+                                    final String id = resolveId(contentMap, 
documentIdField, flowFileIdAttribute);
+                                    opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .fields(contentMap)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                    chunkBytes += docBytes;
+                                    totalBytesAccumulated += docBytes;
+                                }
+                                operations.add(opRequest);
+                                operationFlowFiles.add(flowFile);
+                                operationRecordIndices.add(localRecordIndex++);
+                                if (chunkBytes >= maxBatchBytes) {
+                                    flushChunk(operations, operationFlowFiles, 
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors, 
pendingErrorRecordIndices, context, session);
+                                    operations.clear();
+                                    operationFlowFiles.clear();
+                                    operationRecordIndices.clear();
+                                    chunkBytes = 0;
+                                }
+                            }
+                        }
+                    }
+                } else {
+                    // Single JSON: the entire FlowFile is one document parsed 
into a Map.
+                    // The client service serializes the Map, preserving 
null-suppression settings.
+                    try (final InputStream in = session.read(flowFile)) {
+                        final Map<String, Object> contentMap = 
mapReader.readValue(in);
+                        final String id = 
StringUtils.isNotBlank(flowFileIdAttribute) ? flowFileIdAttribute : null;
+                        final IndexOperationRequest opRequest = 
IndexOperationRequest.builder()
+                                .index(index)
+                                .type(type)
+                                .id(id)
+                                .fields(contentMap)
+                                .operation(o)
+                                .script(scriptMap)
+                                .scriptedUpsert(scriptedUpsert)
+                                .dynamicTemplates(dynamicTemplatesMap)
+                                .headerFields(bulkHeaderFields)
+                                .build();
+                        operations.add(opRequest);
+                        operationFlowFiles.add(flowFile);
+                        operationRecordIndices.add(0);
+                        final long docBytes = flowFile.getSize();
+                        chunkBytes += docBytes;
+                        totalBytesAccumulated += docBytes;
+                        if (chunkBytes >= maxBatchBytes) {
+                            flushChunk(operations, operationFlowFiles, 
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors, 
pendingErrorRecordIndices, context, session);
+                            operations.clear();
+                            operationFlowFiles.clear();
+                            operationRecordIndices.clear();
+                            chunkBytes = 0;
+                        }
+                    }
+                }
+            } catch (final ElasticsearchException ese) {
+                final String msg = String.format("Encountered a server-side 
problem with Elasticsearch. %s",
+                        ese.isElastic() ? "Routing to retry." : "Routing to 
failure.");
+                getLogger().error(msg, ese);
+                final Relationship rel = ese.isElastic() ? REL_RETRY : 
REL_FAILURE;
+                // Route only the failing in-flight chunk to retry/failure. 
FlowFiles already
+                // successfully indexed by prior _bulk requests are routed 
normally to avoid
+                // duplicate indexing if those FlowFiles are re-processed on 
retry.
+                final Set<FlowFile> inFlight = new 
LinkedHashSet<>(operationFlowFiles);
+                transferFlowFilesOnException(ese, rel, session, true, 
inFlight.toArray(new FlowFile[0]));
+                final Set<FlowFile> alreadyIndexed = new 
LinkedHashSet<>(allProcessedFlowFiles);
+                alreadyIndexed.removeAll(inFlight);
+                if (!alreadyIndexed.isEmpty()) {
+                    handleFinalResponse(context, session, errorFlowFiles, 
alreadyIndexed, pendingErrorRecordIndices, inputFormat);
+                }
+                return;
+            } catch (final IOException ioe) {
+                getLogger().error("Could not read FlowFile content as valid 
{}.", inputFormat, ioe);
+                removeFlowFileFromChunk(flowFile, operations, 
operationFlowFiles, operationRecordIndices);
+                flowFile = session.putAttribute(flowFile, 
"elasticsearch.put.error", ioe.getMessage());
+                session.penalize(flowFile);
+                session.transfer(flowFile, REL_FAILURE);
+                parseError = true;
+            } catch (final Exception ex) {
+                getLogger().error("Failed processing records.", ex);
+                removeFlowFileFromChunk(flowFile, operations, 
operationFlowFiles, operationRecordIndices);
+                flowFile = session.putAttribute(flowFile, 
"elasticsearch.put.error", ex.getMessage());
+                session.penalize(flowFile);
+                session.transfer(flowFile, REL_FAILURE);
+                parseError = true;
+            }
+
+            if (!parseError) {
+                allProcessedFlowFiles.add(flowFile);
+                // InputStream is now closed — safe to apply any deferred 
bulk-error attributes
+                applyPendingBulkErrors(flowFile, pendingBulkErrors, session);
+            }
 
-        final String idAttribute = 
context.getProperty(ID_ATTRIBUTE).getValue();
+            flowFilesProcessed++;
 
-        final List<FlowFile> originals = new ArrayList<>(flowFiles.size());
-        final List<IndexOperationRequest> operations = new 
ArrayList<>(flowFiles.size());
+            // For Single JSON, stop when the count-based batch limit is 
reached; otherwise use size limit
+            if (InputFormat.SINGLE_JSON == inputFormat ? flowFilesProcessed >= 
batchSize : totalBytesAccumulated >= maxBatchBytes) {
+                break;
+            }
 
-        for (final FlowFile input : flowFiles) {
-            addOperation(operations, originals, idAttribute, context, session, 
input);
+            flowFile = session.get();
         }
 
-        if (!originals.isEmpty()) {
+        // Flush any remaining operations (all InputStreams are closed at this 
point)
+        if (!operations.isEmpty()) {
             try {
-                final List<FlowFile> errorDocuments = 
indexDocuments(operations, originals, context, session);
-                handleResponse(context, session, errorDocuments, originals);
-                session.transfer(originals, REL_ORIGINAL);
+                flushChunk(operations, operationFlowFiles, 
operationRecordIndices, errorFlowFiles, null, pendingBulkErrors, 
pendingErrorRecordIndices, context, session);
             } catch (final ElasticsearchException ese) {
                 final String msg = String.format("Encountered a server-side 
problem with Elasticsearch. %s",
-                        ese.isElastic() ? "Routing to retry." : "Routing to 
failure");
+                        ese.isElastic() ? "Routing to retry." : "Routing to 
failure.");
                 getLogger().error(msg, ese);
                 final Relationship rel = ese.isElastic() ? REL_RETRY : 
REL_FAILURE;
-                transferFlowFilesOnException(ese, rel, session, true, 
originals.toArray(new FlowFile[0]));
-            } catch (final JsonProcessingException jpe) {
-                getLogger().warn("Could not log Elasticsearch operation errors 
nor determine which documents errored.", jpe);
-                transferFlowFilesOnException(jpe, REL_ERRORS, session, true, 
originals.toArray(new FlowFile[0]));
+                final Set<FlowFile> inFlight = new 
LinkedHashSet<>(operationFlowFiles);
+                transferFlowFilesOnException(ese, rel, session, true, 
inFlight.toArray(new FlowFile[0]));
+                final Set<FlowFile> alreadyIndexed = new 
LinkedHashSet<>(allProcessedFlowFiles);
+                alreadyIndexed.removeAll(inFlight);
+                if (!alreadyIndexed.isEmpty()) {
+                    handleFinalResponse(context, session, errorFlowFiles, 
alreadyIndexed, pendingErrorRecordIndices, inputFormat);
+                }
+                return;
             } catch (final Exception ex) {
                 getLogger().error("Could not index documents.", ex);
-                transferFlowFilesOnException(ex, REL_FAILURE, session, false, 
originals.toArray(new FlowFile[0]));
+                final Set<FlowFile> inFlight = new 
LinkedHashSet<>(operationFlowFiles);
+                transferFlowFilesOnException(ex, REL_FAILURE, session, false, 
inFlight.toArray(new FlowFile[0]));
+                final Set<FlowFile> alreadyIndexed = new 
LinkedHashSet<>(allProcessedFlowFiles);
+                alreadyIndexed.removeAll(inFlight);
+                if (!alreadyIndexed.isEmpty()) {
+                    handleFinalResponse(context, session, errorFlowFiles, 
alreadyIndexed, pendingErrorRecordIndices, inputFormat);
+                }
                 context.yield();
+                return;
             }
+        }
+
+        if (allProcessedFlowFiles.isEmpty()) {
+            getLogger().warn("No records successfully parsed for sending to 
Elasticsearch");
         } else {
-            getLogger().warn("No FlowFiles successfully parsed for sending to 
Elasticsearch");
+            handleFinalResponse(context, session, errorFlowFiles, 
allProcessedFlowFiles, pendingErrorRecordIndices, inputFormat);
         }
     }
 
-    private void addOperation(final List<IndexOperationRequest> operations, 
final List<FlowFile> originals, final String idAttribute,
-                              final ProcessContext context, final 
ProcessSession session, FlowFile input) {
-        final String indexOp = 
context.getProperty(INDEX_OP).evaluateAttributeExpressions(input).getValue();
-        final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
-        final String type = 
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
-        final String id = StringUtils.isNotBlank(idAttribute) && 
StringUtils.isNotBlank(input.getAttribute(idAttribute)) ? 
input.getAttribute(idAttribute) : null;
-
-        final Map<String, Object> scriptMap = getMapFromAttribute(SCRIPT, 
context, input);
-        final boolean scriptedUpsert = 
context.getProperty(SCRIPTED_UPSERT).evaluateAttributeExpressions(input).asBoolean();
-        final Map<String, Object> dynamicTemplatesMap = 
getMapFromAttribute(DYNAMIC_TEMPLATES, context, input);
-
-        final Map<String, String> dynamicProperties = 
getRequestParametersFromDynamicProperties(context, input);
-        final Map<String, String> bulkHeaderFields = 
getBulkHeaderParameters(dynamicProperties);
-
-        final String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue();
-
-        try (final InputStream inStream = session.read(input)) {
-            final byte[] result = IOUtils.toByteArray(inStream);
-            @SuppressWarnings("unchecked")
-            final Map<String, Object> contentMap = mapper.readValue(new 
String(result, charset), Map.class);
-
-            final IndexOperationRequest.Operation o = 
IndexOperationRequest.Operation.forValue(indexOp);
-            operations.add(new IndexOperationRequest(index, type, id, 
contentMap, o, scriptMap, scriptedUpsert, dynamicTemplatesMap, 
bulkHeaderFields));
-
-            originals.add(input);
-        } catch (final IOException ioe) {
-            getLogger().error("Could not read FlowFile content valid JSON.", 
ioe);
-            input = session.putAttribute(input, "elasticsearch.put.error", 
ioe.getMessage());
-            session.penalize(input);
-            session.transfer(input, REL_FAILURE);
-        } catch (final Exception ex) {
-            getLogger().error("Could not index documents.", ex);
-            input = session.putAttribute(input, "elasticsearch.put.error", 
ex.getMessage());
-            session.penalize(input);
-            session.transfer(input, REL_FAILURE);
+    /**
+     * Removes all entries belonging to {@code target} from the three parallel 
lists, so that a
+     * FlowFile that failed mid-read does not leave stale references that 
would cause
+     * {@link org.apache.nifi.processor.exception.FlowFileHandlingException} 
on the next flush.
+     */
+    private void removeFlowFileFromChunk(final FlowFile target,
+                                          final List<IndexOperationRequest> 
operations,
+                                          final List<FlowFile> 
operationFlowFiles,
+                                          final List<Integer> 
operationRecordIndices) {
+        for (int i = operations.size() - 1; i >= 0; i--) {
+            if (operationFlowFiles.get(i) == target) {
+                operations.remove(i);
+                operationFlowFiles.remove(i);
+                operationRecordIndices.remove(i);
+            }
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private Map<String, Object> getMapFromAttribute(final PropertyDescriptor 
propertyDescriptor, final ProcessContext context, final FlowFile input) {
-        final String propertyValue = 
context.getProperty(propertyDescriptor).evaluateAttributeExpressions(input).getValue();
+    /**
+     * Applies any bulk-error attributes that were deferred for {@code 
flowFile} while its
+     * InputStream was open.  Must only be called after the InputStream has 
been closed.
+     */
+    private void applyPendingBulkErrors(final FlowFile flowFile,
+                                         final Map<FlowFile, List<Map<String, 
Object>>> pendingBulkErrors,
+                                         final ProcessSession session) {
+        final List<Map<String, Object>> errorList = 
pendingBulkErrors.remove(flowFile);
+        if (errorList == null) {
+            return;
+        }
         try {
-            return StringUtils.isNotBlank(propertyValue) ? 
mapper.readValue(propertyValue, Map.class) : Collections.emptyMap();
-        } catch (final JsonProcessingException jpe) {
-            throw new ProcessException(propertyDescriptor.getDisplayName() + " 
must be a String parsable into a JSON Object", jpe);
+            final Object errorObj = errorList.size() == 1 ? errorList.get(0) : 
errorList;
+            session.putAttribute(flowFile, "elasticsearch.bulk.error", 
mapper.writeValueAsString(errorObj));
+        } catch (final JsonProcessingException e) {
+            session.putAttribute(flowFile, "elasticsearch.bulk.error",
+                    String.format("{\"error\": {\"type\": 
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
+                            e.getMessage().replace("\"", "\\\"")));
         }
     }
 
-    private List<FlowFile> indexDocuments(final List<IndexOperationRequest> 
operations, final List<FlowFile> originals, final ProcessContext context, final 
ProcessSession session) throws IOException {
-        final Map<String, String> dynamicProperties = 
getRequestParametersFromDynamicProperties(context, originals.getFirst());
+    /**
+     * Sends the accumulated batch of operations to Elasticsearch via the 
_bulk API and records
+     * any per-document errors. If {@code openStreamFlowFile} is non-null, 
error attributes for
+     * that FlowFile are deferred into {@code pendingBulkErrors} so they are 
applied after its
+     * InputStream is closed (calling {@code putAttribute} on an open FlowFile 
throws
+     * {@link org.apache.nifi.processor.exception.FlowFileHandlingException}).
+     * <p>
+     * Only the local record index of each failed operation is stored (O(error 
count) memory).
+     * Error/success content is reconstructed by re-reading the FlowFile in
+     * {@link #handleFinalResponse}.
+     */
+    private void flushChunk(final List<IndexOperationRequest> operations, 
final List<FlowFile> operationFlowFiles,
+                             final List<Integer> operationRecordIndices,
+                             final Set<FlowFile> errorFlowFiles, final 
FlowFile openStreamFlowFile,
+                             final Map<FlowFile, List<Map<String, Object>>> 
pendingBulkErrors,
+                             final Map<FlowFile, Set<Integer>> 
pendingErrorRecordIndices,
+                             final ProcessContext context, final 
ProcessSession session) throws IOException {
+        if (outputBulkRequest) {
+            outputBulkRequestFlowFile(operations, operationFlowFiles, session);
+        }
+
+        final FlowFile firstFlowFile = operationFlowFiles.get(0);
+        final Map<String, String> dynamicProperties = 
getRequestParametersFromDynamicProperties(context, firstFlowFile);
         final IndexOperationResponse response = 
clientService.get().bulk(operations,
-                new 
ElasticsearchRequestOptions(getRequestURLParameters(dynamicProperties), 
getRequestHeadersFromDynamicProperties(context, originals.getFirst())));
+                new 
ElasticsearchRequestOptions(getRequestURLParameters(dynamicProperties), 
getRequestHeadersFromDynamicProperties(context, firstFlowFile)));
 
         final Map<Integer, Map<String, Object>> errors = 
findElasticsearchResponseErrors(response);
-        final List<FlowFile> errorDocuments = new ArrayList<>(errors.size());
+
+        // Attach per-FlowFile error attributes; defer putAttribute for the 
FlowFile whose
+        // InputStream is currently open (putAttribute would throw 
FlowFileHandlingException).
+        // Record only the local record index of each failure — content is 
reconstructed at finalization.
+        final Map<FlowFile, List<Map<String, Object>>> flowFileErrors = new 
HashMap<>();
         errors.forEach((index, error) -> {
-            String errorMessage;
-            try {
-                errorMessage = mapper.writeValueAsString(error);
-            } catch (JsonProcessingException e) {
-                errorMessage = String.format(
-                        "{\"error\": {\"type\": 
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
-                        e.getMessage().replace("\"", "\\\"")
-                );
+            final FlowFile flowFile = operationFlowFiles.get(index);
+            errorFlowFiles.add(flowFile);
+            flowFileErrors.computeIfAbsent(flowFile, k -> new 
ArrayList<>()).add(error);
+            pendingErrorRecordIndices.computeIfAbsent(flowFile, k -> new 
HashSet<>()).add(operationRecordIndices.get(index));
+        });
+        flowFileErrors.forEach((flowFile, errorList) -> {
+            if (flowFile == openStreamFlowFile) {
+                // Defer: InputStream still open — apply after the 
try-with-resources closes
+                pendingBulkErrors.computeIfAbsent(flowFile, k -> new 
ArrayList<>()).addAll(errorList);
+            } else {
+                try {
+                    final Object errorObj = errorList.size() == 1 ? 
errorList.get(0) : errorList;
+                    session.putAttribute(flowFile, "elasticsearch.bulk.error", 
mapper.writeValueAsString(errorObj));
+                } catch (final JsonProcessingException e) {
+                    session.putAttribute(flowFile, "elasticsearch.bulk.error",
+                            String.format("{\"error\": {\"type\": 
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
+                                    e.getMessage().replace("\"", "\\\"")));
+                }
             }
-            errorDocuments.add(session.putAttribute(originals.get(index), 
"elasticsearch.bulk.error", errorMessage));
         });
 
         if (!errors.isEmpty()) {
-            handleElasticsearchDocumentErrors(errors, session, null);
+            handleElasticsearchDocumentErrors(errors, session, firstFlowFile);
         }
+    }
+
+    /**
+     * Serializes the pending batch into an Elasticsearch _bulk NDJSON body 
and writes it to a
+     * new FlowFile on the {@code bulk_request} relationship for inspection or 
debugging.
+     * Each operation produces an action metadata line followed by a document 
line
+     * (Delete operations have no document line). Failures here are logged and 
swallowed so
+     * that a debug-output failure does not abort the actual indexing.
+     */
+    private void outputBulkRequestFlowFile(final List<IndexOperationRequest> 
operations,
+                                            final List<FlowFile> 
operationFlowFiles,
+                                            final ProcessSession session) {
+        final FlowFile parent = operationFlowFiles.get(0);
+        FlowFile bulkRequestFF = session.create(parent);
+        try (final OutputStream out = session.write(bulkRequestFF)) {
+            for (final IndexOperationRequest op : operations) {
+                // Action metadata line
+                final Map<String, Object> actionBody = new LinkedHashMap<>();
+                if (StringUtils.isNotBlank(op.getIndex())) {
+                    actionBody.put("_index", op.getIndex());
+                }
+                if (StringUtils.isNotBlank(op.getType())) {
+                    actionBody.put("_type", op.getType());
+                }
+                if (StringUtils.isNotBlank(op.getId())) {
+                    actionBody.put("_id", op.getId());
+                }
+                if (op.getDynamicTemplates() != null && 
!op.getDynamicTemplates().isEmpty()) {
+                    actionBody.put("dynamic_templates", 
op.getDynamicTemplates());
+                }
+                if (op.getHeaderFields() != null) {
+                    actionBody.putAll(op.getHeaderFields());
+                }
 
-        return errorDocuments;
+                // Upsert maps to "update" in the ES bulk API
+                final String actionName = op.getOperation() == 
IndexOperationRequest.Operation.Upsert
+                        ? "update" : op.getOperation().getValue();
+                final Map<String, Object> actionLine = Map.of(actionName, 
actionBody);
+                out.write(mapper.writeValueAsBytes(actionLine));
+                out.write('\n');
+
+                // Document line (delete has no document)
+                if (op.getOperation() != 
IndexOperationRequest.Operation.Delete) {
+                    if (op.getRawJsonBytes() != null) {
+                        // Index/Create with rawJson path — write directly, no 
Map round-trip
+                        out.write(op.getRawJsonBytes());
+                    } else {
+                        final Map<String, Object> docLine = new 
LinkedHashMap<>();
+                        if (op.getOperation() == 
IndexOperationRequest.Operation.Update
+                                || op.getOperation() == 
IndexOperationRequest.Operation.Upsert) {
+                            if (op.getScript() != null && 
!op.getScript().isEmpty()) {
+                                docLine.put("script", op.getScript());
+                                if (op.isScriptedUpsert()) {
+                                    docLine.put("scripted_upsert", true);
+                                }
+                                if (op.getFields() != null && 
!op.getFields().isEmpty()) {
+                                    docLine.put("upsert", op.getFields());
+                                }
+                            } else {
+                                docLine.put("doc", op.getFields());
+                                if (op.getOperation() == 
IndexOperationRequest.Operation.Upsert) {
+                                    docLine.put("doc_as_upsert", true);
+                                }
+                            }
+                        } else {
+                            docLine.putAll(op.getFields());
+                        }
+                        out.write(mapper.writeValueAsBytes(docLine));
+                    }
+                    out.write('\n');
+                }
+            }
+        } catch (final IOException e) {
+            getLogger().warn("Failed to write bulk request FlowFile for 
inspection.", e);
+            session.remove(bulkRequestFF);
+            return;
+        }
+        bulkRequestFF = session.putAttribute(bulkRequestFF, 
"elasticsearch.bulk.operation.count", String.valueOf(operations.size()));
+        session.transfer(bulkRequestFF, REL_BULK_REQUEST);
     }
 
-    private void handleResponse(final ProcessContext context, final 
ProcessSession session, final List<FlowFile> errorDocuments, final 
List<FlowFile> originals) {
-        // clone FlowFiles to be transferred to errors/successful as the 
originals are pass through to REL_ORIGINAL
-        final List<FlowFile> copiedErrors = 
errorDocuments.stream().map(session::clone).toList();
+    /**
+     * Extracts the document ID from a raw JSON string using a streaming 
parser.
+     * Stops as soon as the target field is found, avoiding a full tree parse.
+     * Used for Index/Create operations to avoid the Map allocation overhead.
+     */
+    private String extractId(final String rawJson, final String idAttribute, 
final String flowFileIdAttribute) throws IOException {
+        if (StringUtils.isBlank(idAttribute)) {
+            return StringUtils.isNotBlank(flowFileIdAttribute) ? 
flowFileIdAttribute : null;
+        }
+        try (final JsonParser p = mapper.getFactory().createParser(rawJson)) {
+            while (p.nextToken() != null) {
+                if (idAttribute.equals(p.currentName()) && p.nextToken() != 
null && !p.currentToken().isStructStart()) {
+                    final String value = p.getText();
+                    return StringUtils.isNotBlank(value) ? value
+                            : (StringUtils.isNotBlank(flowFileIdAttribute) ? 
flowFileIdAttribute : null);
+                }
+            }
+        }
+        return StringUtils.isNotBlank(flowFileIdAttribute) ? 
flowFileIdAttribute : null;
+    }
+
+    /**
+     * Extracts the document ID from a pre-parsed JsonNode.
+     * Used for JSON Array Index/Create operations where the node is already 
available.
+     */
+    private String extractId(final JsonNode node, final String idAttribute, 
final String flowFileIdAttribute) {
+        if (StringUtils.isBlank(idAttribute)) {
+            return StringUtils.isNotBlank(flowFileIdAttribute) ? 
flowFileIdAttribute : null;
+        }
+        final JsonNode idNode = node.get(idAttribute);
+        if (idNode != null && !idNode.isNull()) {
+            return idNode.asText();
+        }
+        return StringUtils.isNotBlank(flowFileIdAttribute) ? 
flowFileIdAttribute : null;
+    }
+
+    /**
+     * Resolves the document ID for Update/Delete/Upsert operations from the 
already-parsed content Map.
+     * Falls back to the FlowFile attribute value when the field is absent 
from the document.
+     */
+    private String resolveId(final Map<String, Object> contentMap, final 
String idAttribute, final String flowFileIdAttribute) {
+        if (StringUtils.isBlank(idAttribute)) {
+            return null;
+        }
+        final Object idObj = contentMap.get(idAttribute);
+        if (idObj != null) {
+            return idObj.toString();
+        }
+        return StringUtils.isNotBlank(flowFileIdAttribute) ? 
flowFileIdAttribute : null;
+    }
+
+    /**
+     * Reads a processor property as a JSON Object string and deserializes it 
into a Map.
+     * Returns an empty Map when the property is blank. Throws {@link 
ProcessException} if the
+     * value is not valid JSON or not a JSON Object.
+     */
+    private Map<String, Object> getMapFromAttribute(final PropertyDescriptor 
propertyDescriptor, final ProcessContext context, final FlowFile input) {
+        final String propertyValue = 
context.getProperty(propertyDescriptor).evaluateAttributeExpressions(input).getValue();
+        try {
+            return StringUtils.isNotBlank(propertyValue) ? 
mapper.readValue(propertyValue, new TypeReference<Map<String, Object>>() { }) : 
Collections.emptyMap();
+        } catch (final JsonProcessingException jpe) {
+            throw new ProcessException(propertyDescriptor.getDisplayName() + " 
must be a String parsable into a JSON Object", jpe);
+        }
+    }
+
+    /**
+     * Routes all successfully processed FlowFiles to their final 
relationships after all
+     * _bulk requests for the trigger have completed.
+     * <ul>
+     *   <li>FlowFiles with no errors are cloned to {@code REL_SUCCESSFUL} 
without re-reading.</li>
+     *   <li>FlowFiles with errors are re-read exactly once to split records 
by failed indices:
+     *       failed records go to a clone on {@code REL_ERRORS}; successful 
records (if any) go
+     *       to a clone on {@code REL_SUCCESSFUL}. For SINGLE_JSON the 
FlowFile is always a single
+     *       document so a direct clone is used instead of re-reading.</li>
+     *   <li>All FlowFiles go to {@code REL_ORIGINAL}.</li>
+     * </ul>
+     */
+    private void handleFinalResponse(final ProcessContext context, final 
ProcessSession session,
+                                     final Set<FlowFile> errorFlowFiles, final 
Set<FlowFile> allFlowFiles,
+                                     final Map<FlowFile, Set<Integer>> 
pendingErrorRecordIndices,
+                                     final InputFormat inputFormat) {
+        final List<FlowFile> copiedErrors = new ArrayList<>();
+        final List<FlowFile> successfulDocuments = new ArrayList<>();
+
+        for (final FlowFile ff : allFlowFiles) {
+            if (!errorFlowFiles.contains(ff)) {
+                // All records succeeded: clone the original without re-reading
+                successfulDocuments.add(session.clone(ff));
+            } else if (inputFormat == InputFormat.SINGLE_JSON) {
+                // One document per FlowFile — it failed entirely; clone 
directly
+                copiedErrors.add(session.clone(ff));
+            } else {
+                // NDJSON or JSON Array: re-read once and split records into 
error/success by index
+                final Set<Integer> failedIndices = 
pendingErrorRecordIndices.getOrDefault(ff, Collections.emptySet());
+                final String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions(ff).getValue();
+                final ByteArrayOutputStream errorBaos = new 
ByteArrayOutputStream();
+                final ByteArrayOutputStream successBaos = new 
ByteArrayOutputStream();
+                session.read(ff, in -> splitRecords(in, failedIndices, 
charset, inputFormat, errorBaos, successBaos));
+
+                if (errorBaos.size() > 0) {
+                    final byte[] errorBytes = errorBaos.toByteArray();
+                    FlowFile errorFf = session.clone(ff);
+                    errorFf = session.write(errorFf, out -> 
out.write(errorBytes));
+                    copiedErrors.add(errorFf);
+                }
+                if (successBaos.size() > 0) {
+                    final byte[] successBytes = successBaos.toByteArray();
+                    FlowFile successFf = session.clone(ff);
+                    successFf = session.write(successFf, out -> 
out.write(successBytes));
+                    successFf = session.removeAttribute(successFf, 
"elasticsearch.bulk.error");
+                    successfulDocuments.add(successFf);
+                }
+            }
+        }
+
         session.transfer(copiedErrors, REL_ERRORS);
         copiedErrors.forEach(e ->
                 session.getProvenanceReporter().send(
@@ -334,7 +1007,6 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
                 )
         );
 
-        final List<FlowFile> successfulDocuments = originals.stream().filter(f 
-> !errorDocuments.contains(f)).map(session::clone).toList();
         session.transfer(successfulDocuments, REL_SUCCESSFUL);
         successfulDocuments.forEach(s ->
                 session.getProvenanceReporter().send(
@@ -345,5 +1017,84 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
                         )
                 )
         );
+
+        session.transfer(allFlowFiles, REL_ORIGINAL);
+    }
+
+    /**
+     * Reads records from {@code in} according to {@code inputFormat} and 
writes each record
+     * (as compact single-line JSON followed by {@code '\n'}) to either {@code 
errorOut} or
+     * {@code successOut} depending on whether its zero-based index is in 
{@code failedIndices}.
+     * <p>
+     * The {@code InputStream} is owned by the NiFi session and must not be 
closed here;
+     * {@code BufferedReader} and {@code JsonParser} wrappers are 
intentionally not closed so
+     * that they do not propagate a close to the underlying stream.
+     */
+    private void splitRecords(final InputStream in, final Set<Integer> 
failedIndices, final String charset,
+                               final InputFormat inputFormat, final 
OutputStream errorOut,
+                               final OutputStream successOut) throws 
IOException {
+        // Track whether each output stream has received its first record so 
we can write '\n'
+        // between records (not after the last one), avoiding a trailing blank 
line.
+        boolean firstError = true;
+        boolean firstSuccess = true;
+        int idx = 0;
+        if (inputFormat == InputFormat.NDJSON) {
+            // BufferedReader not closed intentionally — 'in' is 
session-managed
+            final BufferedReader reader = new BufferedReader(new 
InputStreamReader(in, charset), READER_BUFFER_SIZE);
+            String line;
+            while ((line = reader.readLine()) != null) {
+                final String trimmed = line.trim();
+                if (trimmed.isEmpty()) {
+                    continue;
+                }
+                final byte[] bytes = trimmed.getBytes(StandardCharsets.UTF_8);
+                if (failedIndices.contains(idx)) {
+                    if (!firstError) {
+                        errorOut.write('\n');
+                    }
+                    errorOut.write(bytes);
+                    firstError = false;
+                } else {
+                    if (!firstSuccess) {
+                        successOut.write('\n');
+                    }
+                    successOut.write(bytes);
+                    firstSuccess = false;
+                }
+                idx++;
+            }
+        } else {
+            // JSON Array: stream elements one at a time via JsonParser
+            // JsonParser not closed intentionally — 'in' is session-managed
+            final JsonParser parser = mapper.getFactory().createParser(new 
InputStreamReader(in, charset));
+            JsonToken outerToken;
+            while ((outerToken = parser.nextToken()) != null) {
+                if (outerToken != JsonToken.START_ARRAY) {
+                    continue;
+                }
+                JsonToken token;
+                while ((token = parser.nextToken()) != JsonToken.END_ARRAY) {
+                    if (token == null) {
+                        break;
+                    }
+                    final JsonNode node = mapper.readTree(parser);
+                    final byte[] bytes = mapper.writeValueAsBytes(node);
+                    if (failedIndices.contains(idx)) {
+                        if (!firstError) {
+                            errorOut.write('\n');
+                        }
+                        errorOut.write(bytes);
+                        firstError = false;
+                    } else {
+                        if (!firstSuccess) {
+                            successOut.write('\n');
+                        }
+                        successOut.write(bytes);
+                        firstSuccess = false;
+                    }
+                    idx++;
+                }
+            }
+        }
     }
 }
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index 2a0fd4a318d..62b5865f551 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -516,7 +516,17 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
             contentMap.putIfAbsent("@timestamp", atTimestamp);
         }
 
-        operationList.add(new IndexOperationRequest(index, type, id, 
contentMap, indexOp, script, scriptedUpsert, dynamicTemplates, 
bulkHeaderFields));
+        operationList.add(IndexOperationRequest.builder()
+                .index(index)
+                .type(type)
+                .id(id)
+                .fields(contentMap)
+                .operation(indexOp)
+                .script(script)
+                .scriptedUpsert(scriptedUpsert)
+                .dynamicTemplates(dynamicTemplates)
+                .headerFields(bulkHeaderFields)
+                .build());
     }
 
     private void operate(final List<IndexOperationRequest> operationList, 
final List<Record> processedRecords, final List<Record> originalRecords,
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java
index 37f0249f578..f023920120e 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java
@@ -40,6 +40,7 @@ import java.util.function.Consumer;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -82,7 +83,7 @@ public class PutElasticsearchJsonTest extends 
AbstractPutElasticsearchTest {
         super.setup();
 
         runner.setProperty(PutElasticsearchJson.ID_ATTRIBUTE, "doc_id");
-        runner.setProperty(AbstractPutElasticsearch.BATCH_SIZE, "1");
+        runner.setProperty(PutElasticsearchJson.BATCH_SIZE, "1");
 
         runner.assertValid();
     }
@@ -547,4 +548,337 @@ public class PutElasticsearchJsonTest extends 
AbstractPutElasticsearchTest {
         final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILURE).getFirst();
         
assertTrue(flowFile.getAttribute("elasticsearch.put.error").contains("not"));
     }
+
+    // 
-------------------------------------------------------------------------
+    // NDJSON format tests
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void testNdjsonFormat() {
+        // Switch to NDJSON mode (one JSON object per line)
+        runner.setProperty(PutElasticsearchJson.INPUT_FORMAT, 
InputFormat.NDJSON.getValue());
+        runner.assertValid();
+
+        // Three documents in a single FlowFile
+        final String ndjson = "{\"id\":\"1\",\"msg\":\"hello\"}\n"
+                + "{\"id\":\"2\",\"msg\":\"world\"}\n"
+                + "{\"id\":\"3\",\"msg\":\"foo\"}\n";
+
+        // Capture the operations sent to Elasticsearch
+        final int[] operationCount = {0};
+        clientService.setEvalConsumer(items -> operationCount[0] += 
items.size());
+
+        runner.enqueue(ndjson);
+        runner.run();
+
+        // All three documents indexed in one bulk call → one FlowFile through
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
+        assertEquals(3, operationCount[0], "Expected 3 index operations from 
NDJSON FlowFile");
+    }
+
+    @Test
+    void testNdjsonFormatBlankLinesIgnored() {
+        // Blank lines between NDJSON records should be silently skipped
+        runner.setProperty(PutElasticsearchJson.INPUT_FORMAT, 
InputFormat.NDJSON.getValue());
+        runner.assertValid();
+
+        final String ndjsonWithBlanks = 
"\n{\"id\":\"1\",\"msg\":\"hello\"}\n\n{\"id\":\"2\",\"msg\":\"world\"}\n\n";
+
+        final int[] operationCount = {0};
+        clientService.setEvalConsumer(items -> operationCount[0] += 
items.size());
+
+        runner.enqueue(ndjsonWithBlanks);
+        runner.run();
+
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+        assertEquals(2, operationCount[0], "Expected 2 operations — blank 
lines should be ignored");
+    }
+
+    @Test
+    void testNdjsonFormatInvalidLineRoutesToFailure() {
+        // A malformed JSON line should route the whole FlowFile to failure.
+        // IDENTIFIER_FIELD forces JSON parsing per line (via streaming 
parser), which catches malformed input.
+        // Without it, lines are passed as raw bytes and validation happens 
server-side.
+        runner.setProperty(PutElasticsearchJson.INPUT_FORMAT, 
InputFormat.NDJSON.getValue());
+        runner.setProperty(PutElasticsearchJson.IDENTIFIER_FIELD, "id");
+        runner.assertValid();
+
+        runner.enqueue("{\"id\":\"1\"}\nnot-json\n{\"id\":\"3\"}");
+        runner.run();
+
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 0);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
+
+        final MockFlowFile failed = 
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_FAILURE).getFirst();
+        assertNotNull(failed.getAttribute("elasticsearch.put.error"),
+                "Failed FlowFile should carry elasticsearch.put.error 
attribute");
+    }
+
+    @Test
+    void testNdjsonFormatSizeBatchFlush() {
+        // Set MAX_BATCH_SIZE very small so each document forces its own flush,
+        // resulting in multiple bulk calls for a single FlowFile.
+        runner.setProperty(PutElasticsearchJson.INPUT_FORMAT, 
InputFormat.NDJSON.getValue());
+        runner.setProperty(PutElasticsearchJson.MAX_BATCH_SIZE, "1 B");
+        runner.assertValid();
+
+        final String ndjson = 
"{\"id\":\"1\"}\n{\"id\":\"2\"}\n{\"id\":\"3\"}\n";
+
+        final int[] bulkCallCount = {0};
+        clientService.setEvalConsumer(items -> bulkCallCount[0]++);
+
+        runner.enqueue(ndjson);
+        runner.run();
+
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+        assertTrue(bulkCallCount[0] > 1, "Expected multiple bulk flushes when 
MAX_BATCH_SIZE is tiny");
+    }
+
+    @Test
+    void testNdjsonFormatPartialFailureIsolatesSuccessAndErrorDocs() throws 
Exception {
+        // When an NDJSON FlowFile contains records that partially fail, the 
successful and failed
+        // documents should be routed to REL_SUCCESSFUL and REL_ERRORS 
respectively, each as a
+        // new NDJSON clone containing only those documents.
+        runner.setProperty(PutElasticsearchJson.INPUT_FORMAT, 
InputFormat.NDJSON.getValue());
+        runner.setProperty(PutElasticsearchJson.NOT_FOUND_IS_SUCCESSFUL, 
"true");
+        
clientService.setResponse(IndexOperationResponse.fromJsonResponse(sampleErrorResponse));
+
+        // 7 records in one FlowFile; sampleErrorResponse has errors at 
indices 4, 5, 6
+        final String ndjson = "{\"id\":\"1\",\"field2\":\"20\"}\n"
+                + "{\"id\":\"2\",\"field2\":\"20\"}\n"
+                + "{\"id\":\"3\",\"field2\":\"20\"}\n"
+                + "{\"id\":\"4\",\"field2\":\"not_found\"}\n"
+                + "{\"id\":\"5\",\"field2\":\"20abcd\"}\n"
+                + "{\"id\":\"6\",\"field2\":\"213,456.9\"}\n"
+                + "{\"id\":\"7\",\"field2\":\"unit test\"}\n";
+
+        runner.enqueue(ndjson);
+        runner.run();
+
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+
+        // REL_SUCCESSFUL clone contains only the 4 successfully-indexed 
documents
+        final MockFlowFile successFf = 
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).getFirst();
+        final String successContent = successFf.getContent();
+        assertTrue(successContent.contains("\"id\":\"1\""), "Successful clone 
should include record 1");
+        assertTrue(successContent.contains("\"id\":\"2\""), "Successful clone 
should include record 2");
+        assertTrue(successContent.contains("\"id\":\"3\""), "Successful clone 
should include record 3");
+        assertTrue(successContent.contains("not_found"), "Successful clone 
should include not_found record (treated as success)");
+        assertFalse(successContent.contains("20abcd"), "Successful clone 
should not include failed record 5");
+        assertFalse(successContent.contains("213,456.9"), "Successful clone 
should not include failed record 6");
+        assertFalse(successContent.contains("unit test"), "Successful clone 
should not include failed record 7");
+
+        // REL_ERRORS clone contains only the 3 failed documents
+        final MockFlowFile errorFf = 
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst();
+        final String errorContent = errorFf.getContent();
+        assertTrue(errorContent.contains("20abcd"), "Error clone should 
include failed record 5");
+        assertTrue(errorContent.contains("213,456.9"), "Error clone should 
include failed record 6");
+        assertTrue(errorContent.contains("unit test"), "Error clone should 
include failed record 7");
+        assertFalse(errorContent.contains("\"id\":\"1\""), "Error clone should 
not include successful record 1");
+        errorFf.assertAttributeExists("elasticsearch.bulk.error");
+    }
+
+    // 
-------------------------------------------------------------------------
+    // JSON Array format tests
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void testJsonArrayFormat() {
+        // Switch to JSON Array mode (top-level array of objects)
+        runner.setProperty(PutElasticsearchJson.INPUT_FORMAT, 
InputFormat.JSON_ARRAY.getValue());
+        runner.assertValid();
+
+        final String jsonArray = 
"[{\"id\":\"1\",\"msg\":\"hello\"},{\"id\":\"2\",\"msg\":\"world\"},{\"id\":\"3\",\"msg\":\"foo\"}]";
+
+        final int[] operationCount = {0};
+        clientService.setEvalConsumer(items -> operationCount[0] += 
items.size());
+
+        runner.enqueue(jsonArray);
+        runner.run();
+
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
+        assertEquals(3, operationCount[0], "Expected 3 index operations from 
JSON Array FlowFile");
+    }
+
+    @Test
+    void testJsonArrayFormatMultipleArraysInOneFlowFile() {
+        // A FlowFile may contain multiple top-level JSON arrays; all should 
be indexed
+        runner.setProperty(PutElasticsearchJson.INPUT_FORMAT, 
InputFormat.JSON_ARRAY.getValue());
+        runner.assertValid();
+
+        // Two arrays concatenated in one FlowFile
+        final String twoArrays = 
"[{\"id\":\"1\"},{\"id\":\"2\"}][{\"id\":\"3\"},{\"id\":\"4\"}]";
+
+        final int[] operationCount = {0};
+        clientService.setEvalConsumer(items -> operationCount[0] += 
items.size());
+
+        runner.enqueue(twoArrays);
+        runner.run();
+
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+        assertEquals(4, operationCount[0], "Expected 4 operations — 2 per 
array, 2 arrays");
+    }
+
+    @Test
+    void testJsonArrayFormatMalformedRoutesToFailure() {
+        // A truncated / unclosed array should route to failure with a clear 
error message
+        runner.setProperty(PutElasticsearchJson.INPUT_FORMAT, 
InputFormat.JSON_ARRAY.getValue());
+        runner.assertValid();
+
+        // Missing closing ']'
+        runner.enqueue("[{\"id\":\"1\"},{\"id\":\"2\"}");
+        runner.run();
+
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 0);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
+
+        final MockFlowFile failed = 
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_FAILURE).getFirst();
+        assertNotNull(failed.getAttribute("elasticsearch.put.error"),
+                "Failed FlowFile should carry elasticsearch.put.error 
attribute");
+    }
+
+    @Test
+    void testJsonArrayFormatPrettyPrinted() {
+        // Pretty-printed (multi-line) JSON array — newlines inside the array 
must not confuse the parser
+        runner.setProperty(PutElasticsearchJson.INPUT_FORMAT, 
InputFormat.JSON_ARRAY.getValue());
+        runner.assertValid();
+
+        final String prettyArray =
+                "[\n"
+                + "  { \"id\": \"1\", \"msg\": \"hello\" },\n"
+                + "  { \"id\": \"2\", \"msg\": \"world\" },\n"
+                + "  { \"id\": \"3\", \"msg\": \"foo\" }\n"
+                + "]\n";
+
+        final int[] operationCount = {0};
+        clientService.setEvalConsumer(items -> operationCount[0] += 
items.size());
+
+        runner.enqueue(prettyArray);
+        runner.run();
+
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+        assertEquals(3, operationCount[0], "Expected 3 operations from 
pretty-printed JSON array");
+    }
+
+    @Test
+    void testJsonArrayFormatMultiplePrettyPrintedArrays() {
+        // Multiple pretty-printed arrays in one FlowFile (newlines between 
and within arrays)
+        runner.setProperty(PutElasticsearchJson.INPUT_FORMAT, 
InputFormat.JSON_ARRAY.getValue());
+        runner.assertValid();
+
+        final String twoArraysPretty =
+                "[\n"
+                + "  { \"id\": \"1\" },\n"
+                + "  { \"id\": \"2\" }\n"
+                + "]\n"
+                + "[\n"
+                + "  { \"id\": \"3\" },\n"
+                + "  { \"id\": \"4\" }\n"
+                + "]\n";
+
+        final int[] operationCount = {0};
+        clientService.setEvalConsumer(items -> operationCount[0] += 
items.size());
+
+        runner.enqueue(twoArraysPretty);
+        runner.run();
+
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+        assertEquals(4, operationCount[0], "Expected 4 operations from two 
pretty-printed arrays");
+    }
+
+    @Test
+    void testJsonArrayFormatMixedCompactAndPretty() {
+        // One compact array followed by one pretty-printed array in the same 
FlowFile
+        runner.setProperty(PutElasticsearchJson.INPUT_FORMAT, 
InputFormat.JSON_ARRAY.getValue());
+        runner.assertValid();
+
+        final String mixed =
+                "[{\"id\":\"1\"},{\"id\":\"2\"}]\n"
+                + "[\n"
+                + "  { \"id\": \"3\" },\n"
+                + "  { \"id\": \"4\" }\n"
+                + "]\n";
+
+        final int[] operationCount = {0};
+        clientService.setEvalConsumer(items -> operationCount[0] += 
items.size());
+
+        runner.enqueue(mixed);
+        runner.run();
+
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+        assertEquals(4, operationCount[0], "Expected 4 operations from mixed 
compact + pretty arrays");
+    }
+
+    @Test
+    void testJsonArrayFormatNotAnArrayRoutesToFailure() {
+        // Content that starts with something other than '[' is not a JSON 
array
+        runner.setProperty(PutElasticsearchJson.INPUT_FORMAT, 
InputFormat.JSON_ARRAY.getValue());
+        runner.assertValid();
+
+        runner.enqueue("{\"id\":\"1\"}");
+        runner.run();
+
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 0);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
+    }
+
+    // 
-------------------------------------------------------------------------
+    // OUTPUT_BULK_REQUEST relationship test
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void testOutputBulkRequestEnabled() {
+        // When Output Bulk Request is true, the raw NDJSON body should be 
written
+        // to the bulk_request relationship in addition to normal processing.
+        runner.setProperty(PutElasticsearchJson.OUTPUT_BULK_REQUEST, "true");
+        runner.assertValid();
+
+        runner.enqueue(flowFileContents);
+        runner.run();
+
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+        runner.assertTransferCount(PutElasticsearchJson.REL_BULK_REQUEST, 1);
+
+        final MockFlowFile bulkRequest = 
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_BULK_REQUEST).getFirst();
+        // The bulk request body must contain the action metadata line and the 
document line
+        final String body = bulkRequest.getContent();
+        assertTrue(body.contains("\"index\""), "Bulk request body should 
contain action metadata");
+        assertTrue(body.contains("msg"), "Bulk request body should contain 
document field from flowFileContents");
+    }
+
+    @Test
+    void testOutputBulkRequestDisabledByDefault() {
+        // The bulk_request relationship should not appear unless explicitly 
enabled
+        runner.assertValid();
+
+        runner.enqueue(flowFileContents);
+        runner.run();
+
+        runner.assertTransferCount(PutElasticsearchJson.REL_BULK_REQUEST, 0);
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchProcessorMigration.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchProcessorMigration.java
index f7b55cada78..e1724b7ab7a 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchProcessorMigration.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchProcessorMigration.java
@@ -132,7 +132,8 @@ public class TestElasticSearchProcessorMigration {
                 "put-es-json-scripted-upsert", 
PutElasticsearchJson.SCRIPTED_UPSERT.getName(),
                 "put-es-json-dynamic_templates", 
PutElasticsearchJson.DYNAMIC_TEMPLATES.getName(),
                 "put-es-json-charset", PutElasticsearchJson.CHARSET.getName(),
-                "put-es-json-not_found-is-error", 
AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName()
+                "put-es-json-not_found-is-error", 
AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName(),
+                AbstractPutElasticsearch.BATCH_SIZE.getName(), 
PutElasticsearchJson.BATCH_SIZE.getName()
         );
 
         final Map<String, String> expectedRenamed = new 
HashMap<>(expectedOwnRenamed);

Reply via email to