This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 5fc2e6a02a5 NIFI-15681 - Enhance PutElasticsearchJson to support
NDJSON, JSON Array, and Single JSON input formats with size-based batching
(#10981)
5fc2e6a02a5 is described below
commit 5fc2e6a02a51312a9c63192bad1d4f43030f4fc4
Author: agturley <[email protected]>
AuthorDate: Tue Apr 7 07:15:32 2026 -0700
NIFI-15681 - Enhance PutElasticsearchJson to support NDJSON, JSON Array,
and Single JSON input formats with size-based batching (#10981)
---
.../nifi/elasticsearch/IndexOperationRequest.java | 109 ++-
.../ElasticSearchClientServiceImpl.java | 39 +-
.../integration/ElasticSearchClientService_IT.java | 87 +-
.../nifi/processors/elasticsearch/InputFormat.java | 59 ++
.../elasticsearch/PutElasticsearchJson.java | 955 ++++++++++++++++++---
.../elasticsearch/PutElasticsearchRecord.java | 12 +-
.../elasticsearch/PutElasticsearchJsonTest.java | 336 +++++++-
.../TestElasticSearchProcessorMigration.java | 3 +-
8 files changed, 1438 insertions(+), 162 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
index b63519e5983..6e3ac3daf93 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java
@@ -17,8 +17,10 @@
package org.apache.nifi.elasticsearch;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
/**
* A POJO that represents an "operation on an index". It should not be
confused with just indexing documents, as it
@@ -30,25 +32,102 @@ public class IndexOperationRequest {
private final String type;
private final String id;
private final Map<String, Object> fields;
+ private final byte[] rawJsonBytes;
private final Operation operation;
private final Map<String, Object> script;
-
private final boolean scriptedUpsert;
private final Map<String, Object> dynamicTemplates;
private final Map<String, String> headerFields;
- public IndexOperationRequest(final String index, final String type, final
String id, final Map<String, Object> fields,
- final Operation operation, final Map<String,
Object> script, final boolean scriptedUpsert,
- final Map<String, Object> dynamicTemplates,
final Map<String, String> headerFields) {
- this.index = index;
- this.type = type;
- this.id = id;
- this.fields = fields;
- this.operation = operation;
- this.script = script;
- this.scriptedUpsert = scriptedUpsert;
- this.dynamicTemplates = dynamicTemplates;
- this.headerFields = headerFields;
+ private IndexOperationRequest(final Builder builder) {
+ this.index = builder.index;
+ this.type = builder.type;
+ this.id = builder.id;
+ this.fields = builder.fields;
+ this.rawJsonBytes = builder.rawJsonBytes;
+ this.operation = builder.operation;
+ this.script = builder.script;
+ this.scriptedUpsert = builder.scriptedUpsert;
+ this.dynamicTemplates = builder.dynamicTemplates;
+ this.headerFields = builder.headerFields;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private String index;
+ private String type;
+ private String id;
+ private Map<String, Object> fields;
+ private byte[] rawJsonBytes;
+ private Operation operation;
+ private Map<String, Object> script;
+ private boolean scriptedUpsert;
+ private Map<String, Object> dynamicTemplates;
+ private Map<String, String> headerFields;
+
+ public Builder index(final String index) {
+ this.index = index;
+ return this;
+ }
+
+ public Builder type(final String type) {
+ this.type = type;
+ return this;
+ }
+
+ public Builder id(final String id) {
+ this.id = id;
+ return this;
+ }
+
+ public Builder fields(final Map<String, Object> fields) {
+ this.fields = fields;
+ return this;
+ }
+
+ public Builder rawJson(final String rawJson) {
+ this.rawJsonBytes = rawJson != null ?
rawJson.getBytes(StandardCharsets.UTF_8) : null;
+ return this;
+ }
+
+ public Builder rawJsonBytes(final byte[] rawJsonBytes) {
+ this.rawJsonBytes = rawJsonBytes;
+ return this;
+ }
+
+ public Builder operation(final Operation operation) {
+ this.operation = operation;
+ return this;
+ }
+
+ public Builder script(final Map<String, Object> script) {
+ this.script = script;
+ return this;
+ }
+
+ public Builder scriptedUpsert(final boolean scriptedUpsert) {
+ this.scriptedUpsert = scriptedUpsert;
+ return this;
+ }
+
+ public Builder dynamicTemplates(final Map<String, Object>
dynamicTemplates) {
+ this.dynamicTemplates = dynamicTemplates;
+ return this;
+ }
+
+ public Builder headerFields(final Map<String, String> headerFields) {
+ this.headerFields = headerFields;
+ return this;
+ }
+
+ public IndexOperationRequest build() {
+ Objects.requireNonNull(index, "Index required");
+ Objects.requireNonNull(operation, "Operation required");
+ return new IndexOperationRequest(this);
+ }
}
public String getIndex() {
@@ -67,6 +146,10 @@ public class IndexOperationRequest {
return fields;
}
+ public byte[] getRawJsonBytes() {
+ return rawJsonBytes;
+ }
+
public Operation getOperation() {
return operation;
}
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index dbd22cded5e..4b9aa9e6216 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -27,6 +27,7 @@ import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
+import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
@@ -68,6 +69,7 @@ import org.elasticsearch.client.sniff.Sniffer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.Proxy;
import java.net.URI;
@@ -202,7 +204,6 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
private void createObjectMapper(final ConfigurationContext context) {
mapper = new ObjectMapper();
if
(ALWAYS_SUPPRESS.getValue().equals(context.getProperty(SUPPRESS_NULLS).getValue()))
{
- mapper.setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL);
mapper.setDefaultPropertyInclusion(JsonInclude.Include.NON_EMPTY);
}
prettyPrintWriter = mapper.writerWithDefaultPrettyPrinter();
@@ -624,6 +625,19 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
return bulk(Collections.singletonList(operation),
elasticsearchRequestOptions);
}
+ /**
+ * ByteArrayOutputStream subclass that exposes the internal buffer without
a defensive copy,
+ * allowing zero-copy construction of ByteArrayEntity.
+ */
+ private static final class ExposedByteArrayOutputStream extends
ByteArrayOutputStream {
+ byte[] buf() {
+ return buf;
+ }
+ int count() {
+ return count;
+ }
+ }
+
private String flatten(final String str) {
return str.replaceAll("[\\n\\r]", "\\\\n");
}
@@ -656,13 +670,18 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
return
flatten(mapper.writeValueAsString(Collections.singletonMap(operation,
operationBody)));
}
- protected void buildRequest(final IndexOperationRequest request, final
StringBuilder builder) throws JsonProcessingException {
+ protected void buildRequest(final IndexOperationRequest request, final
OutputStream out) throws IOException {
final String header = buildBulkHeader(request);
- builder.append(header).append("\n");
+ out.write(header.getBytes(StandardCharsets.UTF_8));
+ out.write('\n');
switch (request.getOperation()) {
case Index, Create:
- final String indexDocument =
mapper.writeValueAsString(request.getFields());
- builder.append(indexDocument).append("\n");
+ if (request.getRawJsonBytes() != null) {
+ out.write(request.getRawJsonBytes());
+ } else {
+ mapper.writeValue(out, request.getFields());
+ }
+ out.write('\n');
break;
case Update, Upsert:
final Map<String, Object> updateBody = new HashMap<>(2, 1);
@@ -678,9 +697,9 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
updateBody.put("doc_as_upsert", true);
}
}
-
final String update =
flatten(mapper.writeValueAsString(updateBody)).trim();
- builder.append(update).append("\n");
+ out.write(update.getBytes(StandardCharsets.UTF_8));
+ out.write('\n');
break;
case Delete:
// nothing to do for Delete operations, it just needs the
header
@@ -691,15 +710,15 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
@Override
public IndexOperationResponse bulk(final List<IndexOperationRequest>
operations, final ElasticsearchRequestOptions elasticsearchRequestOptions) {
try {
- final StringBuilder payload = new StringBuilder();
+ final ExposedByteArrayOutputStream payload = new
ExposedByteArrayOutputStream();
for (final IndexOperationRequest or : operations) {
buildRequest(or, payload);
}
if (getLogger().isDebugEnabled()) {
- getLogger().debug("{}", payload);
+ getLogger().debug("{}",
payload.toString(StandardCharsets.UTF_8));
}
- final HttpEntity entity = new NStringEntity(payload.toString(),
ContentType.APPLICATION_JSON);
+ final HttpEntity entity = new ByteArrayEntity(payload.buf(), 0,
payload.count(), ContentType.APPLICATION_JSON);
final StopWatch watch = new StopWatch();
watch.start();
final Response response = performRequest("POST", "/_bulk",
elasticsearchRequestOptions, entity);
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
index 88e71407656..da85c07056d 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
@@ -599,7 +599,8 @@ class ElasticSearchClientService_IT extends
AbstractElasticsearch_IT {
assertNotNull(doc);
} finally {
// replace the deleted doc
- service.add(new IndexOperationRequest(INDEX, type, "1",
originalDoc, IndexOperationRequest.Operation.Index, null, false, null, null),
+ service.add(IndexOperationRequest.builder()
+
.index(INDEX).type(type).id("1").fields(originalDoc).operation(IndexOperationRequest.Operation.Index).build(),
new ElasticsearchRequestOptions(null, Map.of("Accept",
"application/json")));
waitForIndexRefresh(); // (affects later tests using _search or
_bulk)
}
@@ -719,7 +720,8 @@ class ElasticSearchClientService_IT extends
AbstractElasticsearch_IT {
suppressNulls(false);
IndexOperationResponse response = service.bulk(
Collections.singletonList(
- new IndexOperationRequest("nulls", type, "1", doc,
IndexOperationRequest.Operation.Index, null, false, null, null)
+ IndexOperationRequest.builder()
+
.index("nulls").type(type).id("1").fields(doc).operation(IndexOperationRequest.Operation.Index).build()
),
new ElasticsearchRequestOptions(null, Map.of("Accept",
"application/json"))
);
@@ -731,7 +733,8 @@ class ElasticSearchClientService_IT extends
AbstractElasticsearch_IT {
// suppress nulls
suppressNulls(true);
- response = service.bulk(Collections.singletonList(new
IndexOperationRequest("nulls", type, "2", doc,
IndexOperationRequest.Operation.Index, null, false, null, null)), null);
+ response =
service.bulk(Collections.singletonList(IndexOperationRequest.builder()
+
.index("nulls").type(type).id("2").fields(doc).operation(IndexOperationRequest.Operation.Index).build()),
null);
assertNotNull(response);
waitForIndexRefresh();
@@ -757,12 +760,12 @@ class ElasticSearchClientService_IT extends
AbstractElasticsearch_IT {
final List<IndexOperationRequest> payload = new ArrayList<>();
for (int x = 0; x < 20; x++) {
final String index = x % 2 == 0 ? "bulk_a" : "bulk_b";
- payload.add(new IndexOperationRequest(index, type,
String.valueOf(x), Map.of("msg", "test"),
- IndexOperationRequest.Operation.Index, null, false, null,
null));
+ payload.add(IndexOperationRequest.builder()
+
.index(index).type(type).id(String.valueOf(x)).fields(Map.of("msg",
"test")).operation(IndexOperationRequest.Operation.Index).build());
}
for (int x = 0; x < 5; x++) {
- payload.add(new IndexOperationRequest("bulk_c", type,
String.valueOf(x), Map.of("msg", "test"),
- IndexOperationRequest.Operation.Index, null, false, null,
null));
+ payload.add(IndexOperationRequest.builder()
+
.index("bulk_c").type(type).id(String.valueOf(x)).fields(Map.of("msg",
"test")).operation(IndexOperationRequest.Operation.Index).build());
}
final IndexOperationResponse response = service.bulk(payload, new
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
assertNotNull(response);
@@ -794,12 +797,14 @@ class ElasticSearchClientService_IT extends
AbstractElasticsearch_IT {
final List<IndexOperationRequest> payload = new ArrayList<>();
for (int x = 0; x < 20; x++) {
final String index = x % 2 == 0 ? "bulk_a" : "bulk_b";
- payload.add(new IndexOperationRequest(index, type,
String.valueOf(x), new MapBuilder().of("msg", "test").build(),
- IndexOperationRequest.Operation.Index, null, false, null,
Collections.singletonMap("retry_on_conflict", "3")));
+ payload.add(IndexOperationRequest.builder()
+ .index(index).type(type).id(String.valueOf(x)).fields(new
MapBuilder().of("msg", "test").build())
+
.operation(IndexOperationRequest.Operation.Index).headerFields(Collections.singletonMap("retry_on_conflict",
"3")).build());
}
for (int x = 0; x < 5; x++) {
- payload.add(new IndexOperationRequest("bulk_c", type,
String.valueOf(x), new MapBuilder().of("msg", "test").build(),
- IndexOperationRequest.Operation.Index, null, false, null,
null));
+ payload.add(IndexOperationRequest.builder()
+
.index("bulk_c").type(type).id(String.valueOf(x)).fields(new
MapBuilder().of("msg", "test").build())
+ .operation(IndexOperationRequest.Operation.Index).build());
}
final IndexOperationResponse response = service.bulk(payload, new
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
assertNotNull(response);
@@ -827,8 +832,9 @@ class ElasticSearchClientService_IT extends
AbstractElasticsearch_IT {
@Test
void testUnknownBulkHeader() {
- final IndexOperationRequest failingRequest = new
IndexOperationRequest("bulk_c", type, "1", new MapBuilder().of("msg",
"test").build(),
- IndexOperationRequest.Operation.Index, null, false, null,
Collections.singletonMap("not_exist", "true"));
+ final IndexOperationRequest failingRequest =
IndexOperationRequest.builder()
+ .index("bulk_c").type(type).id("1").fields(new
MapBuilder().of("msg", "test").build())
+
.operation(IndexOperationRequest.Operation.Index).headerFields(Collections.singletonMap("not_exist",
"true")).build();
final ElasticsearchException ee =
assertThrows(ElasticsearchException.class, () -> service.add(failingRequest,
null));
assertInstanceOf(ResponseException.class, ee.getCause());
assertTrue(ee.getCause().getMessage().contains("Action/metadata line
[1] contains an unknown parameter [not_exist]"));
@@ -837,8 +843,9 @@ class ElasticSearchClientService_IT extends
AbstractElasticsearch_IT {
@Test
void testDynamicTemplates() {
final List<IndexOperationRequest> payload = Collections.singletonList(
- new IndexOperationRequest("dynamo", type, "1", new
MapBuilder().of("msg", "test", "hello", "world").build(),
- IndexOperationRequest.Operation.Index, null, false,
new MapBuilder().of("hello", "test_text").build(), null)
+ IndexOperationRequest.builder()
+ .index("dynamo").type(type).id("1").fields(new
MapBuilder().of("msg", "test", "hello", "world").build())
+
.operation(IndexOperationRequest.Operation.Index).dynamicTemplates(new
MapBuilder().of("hello", "test_text").build()).build()
);
final IndexOperationResponse response = service.bulk(payload, new
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
@@ -862,7 +869,8 @@ class ElasticSearchClientService_IT extends
AbstractElasticsearch_IT {
final Map<String, Object> doc = new HashMap<>();
doc.put("msg", "Buongiorno, mondo");
doc.put("counter", 1);
- service.add(new IndexOperationRequest(INDEX, type, testId, doc,
IndexOperationRequest.Operation.Index, null, false, null, null),
+ service.add(IndexOperationRequest.builder()
+
.index(INDEX).type(type).id(testId).fields(doc).operation(IndexOperationRequest.Operation.Index).build(),
new ElasticsearchRequestOptions(Map.of("refresh", "true"),
null));
Map<String, Object> result = service.get(INDEX, type, testId,
null);
assertEquals(doc, result, "Not the same");
@@ -872,7 +880,8 @@ class ElasticSearchClientService_IT extends
AbstractElasticsearch_IT {
final Map<String, Object> merged = new HashMap<>();
merged.putAll(updates);
merged.putAll(doc);
- IndexOperationRequest request = new IndexOperationRequest(INDEX,
type, testId, updates, IndexOperationRequest.Operation.Update, null, false,
null, null);
+ IndexOperationRequest request = IndexOperationRequest.builder()
+
.index(INDEX).type(type).id(testId).fields(updates).operation(IndexOperationRequest.Operation.Update).build();
service.add(request, new
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
result = service.get(INDEX, type, testId, null);
assertTrue(result.containsKey("from"));
@@ -884,7 +893,8 @@ class ElasticSearchClientService_IT extends
AbstractElasticsearch_IT {
upsertItems.put("upsert_1", "hello");
upsertItems.put("upsert_2", 1);
upsertItems.put("upsert_3", true);
- request = new IndexOperationRequest(INDEX, type, upsertedId,
upsertItems, IndexOperationRequest.Operation.Upsert, null, false, null, null);
+ request = IndexOperationRequest.builder()
+
.index(INDEX).type(type).id(upsertedId).fields(upsertItems).operation(IndexOperationRequest.Operation.Upsert).build();
service.add(request, new
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
result = service.get(INDEX, type, upsertedId, null);
assertEquals(upsertItems, result);
@@ -896,13 +906,15 @@ class ElasticSearchClientService_IT extends
AbstractElasticsearch_IT {
script.put("lang", "painless");
script.put("params", Collections.singletonMap("count", 2));
// apply script to existing document
- request = new IndexOperationRequest(INDEX, type, testId,
upsertDoc, IndexOperationRequest.Operation.Upsert, script, false, null, null);
+ request = IndexOperationRequest.builder()
+
.index(INDEX).type(type).id(testId).fields(upsertDoc).operation(IndexOperationRequest.Operation.Upsert).script(script).build();
service.add(request, new
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
result = service.get(INDEX, type, testId, new
ElasticsearchRequestOptions());
assertEquals(doc.get("msg"), result.get("msg"));
assertEquals(3, result.get("counter"));
// index document that doesn't already exist (don't apply script)
- request = new IndexOperationRequest(INDEX, type, upsertScriptId,
upsertDoc, IndexOperationRequest.Operation.Upsert, script, false, null, null);
+ request = IndexOperationRequest.builder()
+
.index(INDEX).type(type).id(upsertScriptId).fields(upsertDoc).operation(IndexOperationRequest.Operation.Upsert).script(script).build();
service.add(request, new
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
result = service.get(INDEX, type, upsertScriptId, new
ElasticsearchRequestOptions(null, null));
assertNull(result.get("counter"));
@@ -914,31 +926,38 @@ class ElasticSearchClientService_IT extends
AbstractElasticsearch_IT {
upsertScript.put("lang", "painless");
upsertScript.put("params", Collections.singletonMap("count", 2));
// no script execution if doc found (without scripted_upsert)
- request = new IndexOperationRequest(INDEX, type, scriptedUpsertId,
emptyUpsertDoc, IndexOperationRequest.Operation.Upsert, upsertScript, false,
null, null);
+ request = IndexOperationRequest.builder()
+
.index(INDEX).type(type).id(scriptedUpsertId).fields(emptyUpsertDoc).operation(IndexOperationRequest.Operation.Upsert).script(upsertScript).build();
service.add(request, new
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
assertFalse(service.documentExists(INDEX, type, scriptedUpsertId,
new ElasticsearchRequestOptions(null, Map.of("Accept", "application/json"))));
// script execution with no doc found (with scripted_upsert) - doc
not create, no "upsert" doc provided (empty objects suppressed)
suppressNulls(true);
- request = new IndexOperationRequest(INDEX, type, scriptedUpsertId,
emptyUpsertDoc, IndexOperationRequest.Operation.Upsert, upsertScript, true,
null, null);
+ request = IndexOperationRequest.builder()
+
.index(INDEX).type(type).id(scriptedUpsertId).fields(emptyUpsertDoc).operation(IndexOperationRequest.Operation.Upsert)
+ .script(upsertScript).scriptedUpsert(true).build();
service.add(request, new
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
assertFalse(service.documentExists(INDEX, type, scriptedUpsertId,
null));
// script execution with no doc found (with scripted_upsert) - doc
created, empty "upsert" doc provided
suppressNulls(false);
- request = new IndexOperationRequest(INDEX, type, scriptedUpsertId,
emptyUpsertDoc, IndexOperationRequest.Operation.Upsert, upsertScript, true,
null, null);
+ request = IndexOperationRequest.builder()
+
.index(INDEX).type(type).id(scriptedUpsertId).fields(emptyUpsertDoc).operation(IndexOperationRequest.Operation.Upsert)
+ .script(upsertScript).scriptedUpsert(true).build();
service.add(request, new
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
result = service.get(INDEX, type, scriptedUpsertId, new
ElasticsearchRequestOptions(null, null));
assertEquals(2, result.get("counter"));
// script execution with no doc found (with scripted_upsert) - doc
updated
- request = new IndexOperationRequest(INDEX, type, scriptedUpsertId,
emptyUpsertDoc, IndexOperationRequest.Operation.Upsert, upsertScript, true,
null, null);
+ request = IndexOperationRequest.builder()
+
.index(INDEX).type(type).id(scriptedUpsertId).fields(emptyUpsertDoc).operation(IndexOperationRequest.Operation.Upsert)
+ .script(upsertScript).scriptedUpsert(true).build();
service.add(request, new
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
result = service.get(INDEX, type, scriptedUpsertId, new
ElasticsearchRequestOptions());
assertEquals(4, result.get("counter"));
} finally {
final List<IndexOperationRequest> deletes = new ArrayList<>();
- deletes.add(new IndexOperationRequest(INDEX, type, testId, null,
IndexOperationRequest.Operation.Delete, null, false, null, null));
- deletes.add(new IndexOperationRequest(INDEX, type, upsertedId,
null, IndexOperationRequest.Operation.Delete, null, false, null, null));
- deletes.add(new IndexOperationRequest(INDEX, type, upsertScriptId,
null, IndexOperationRequest.Operation.Delete, null, false, null, null));
- deletes.add(new IndexOperationRequest(INDEX, type,
scriptedUpsertId, null, IndexOperationRequest.Operation.Delete, null, false,
null, null));
+
deletes.add(IndexOperationRequest.builder().index(INDEX).type(type).id(testId).operation(IndexOperationRequest.Operation.Delete).build());
+
deletes.add(IndexOperationRequest.builder().index(INDEX).type(type).id(upsertedId).operation(IndexOperationRequest.Operation.Delete).build());
+
deletes.add(IndexOperationRequest.builder().index(INDEX).type(type).id(upsertScriptId).operation(IndexOperationRequest.Operation.Delete).build());
+
deletes.add(IndexOperationRequest.builder().index(INDEX).type(type).id(scriptedUpsertId).operation(IndexOperationRequest.Operation.Delete).build());
assertFalse(service.bulk(deletes, new
ElasticsearchRequestOptions(Map.of("refresh", "true"), null)).hasErrors());
waitForIndexRefresh(); // wait 1s for index refresh (doesn't
prevent GET but affects later tests using _search or _bulk)
assertFalse(service.documentExists(INDEX, type, testId, null));
@@ -952,12 +971,12 @@ class ElasticSearchClientService_IT extends
AbstractElasticsearch_IT {
@Test
void testGetBulkResponsesWithErrors() {
final List<IndexOperationRequest> ops = Arrays.asList(
- new IndexOperationRequest(INDEX, type, "1", new
MapBuilder().of("msg", "one", "intField", 1).build(),
- IndexOperationRequest.Operation.Index, null, false,
null, null), // OK
- new IndexOperationRequest(INDEX, type, "2", new
MapBuilder().of("msg", "two", "intField", 1).build(),
- IndexOperationRequest.Operation.Create, null, false,
null, null), // already exists
- new IndexOperationRequest(INDEX, type, "1", new
MapBuilder().of("msg", "one", "intField", "notaninteger").build(),
- IndexOperationRequest.Operation.Index, null, false,
null, null) // can't parse int field
+ IndexOperationRequest.builder().index(INDEX).type(type).id("1")
+ .fields(new MapBuilder().of("msg", "one", "intField",
1).build()).operation(IndexOperationRequest.Operation.Index).build(), // OK
+ IndexOperationRequest.builder().index(INDEX).type(type).id("2")
+ .fields(new MapBuilder().of("msg", "two", "intField",
1).build()).operation(IndexOperationRequest.Operation.Create).build(), //
already exists
+ IndexOperationRequest.builder().index(INDEX).type(type).id("1")
+ .fields(new MapBuilder().of("msg", "one", "intField",
"notaninteger").build()).operation(IndexOperationRequest.Operation.Index).build()
// can't parse int field
);
final IndexOperationResponse response = service.bulk(ops, new
ElasticsearchRequestOptions(Map.of("refresh", "true"), null));
assertTrue(response.hasErrors());
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/InputFormat.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/InputFormat.java
new file mode 100644
index 00000000000..06ae48c68e4
--- /dev/null
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/InputFormat.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum InputFormat implements DescribedValue {
+ NDJSON("NDJSON", "NDJSON", "One JSON object per line
(newline-delimited)."),
+ JSON_ARRAY("JSON Array", "JSON Array", "A top-level JSON array of objects,
streamed element-by-element for memory efficiency."),
+ SINGLE_JSON("Single JSON", "Single JSON", "The entire FlowFile is a single
JSON document.");
+
+ private final String value;
+ private final String displayName;
+ private final String description;
+
+ InputFormat(final String value, final String displayName, final String
description) {
+ this.value = value;
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+ public static InputFormat fromValue(final String value) {
+ for (final InputFormat format : values()) {
+ if (format.value.equals(value)) {
+ return format;
+ }
+ }
+ throw new IllegalArgumentException("Unknown Input Format: " + value);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
index b52bbc4c7f2..2b10871da38 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
@@ -17,8 +17,15 @@
package org.apache.nifi.processors.elasticsearch;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.commons.io.IOUtils;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -32,6 +39,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticsearchException;
import org.apache.nifi.elasticsearch.ElasticsearchRequestOptions;
@@ -41,6 +49,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.migration.RelationshipConfiguration;
+import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@@ -48,20 +57,33 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@SupportsBatching
-@Tags({"json", "elasticsearch", "elasticsearch7", "elasticsearch8",
"elasticsearch9", "put", "index"})
-@CapabilityDescription("An Elasticsearch put processor that uses the official
Elastic REST client libraries. " +
- "Each FlowFile is treated as a document to be sent to the
Elasticsearch _bulk API. Multiple FlowFiles can be batched together into each
Request sent to Elasticsearch.")
+@Tags({"ndjson", "json", "json array", "elasticsearch", "elasticsearch7",
"elasticsearch8", "elasticsearch9", "put", "index"})
+@CapabilityDescription("An Elasticsearch put processor that uses the official
Elastic REST client libraries to write JSON data. " +
+ "Supports three input formats selectable via the \"Input Content
Format\" property: " +
+ "NDJSON (one JSON object per line), JSON Array (a top-level array of
objects, streamed for memory efficiency), " +
+ "and Single JSON (the entire FlowFile is one document). " +
+ "FlowFiles are accumulated up to the configured Max Batch Size and
flushed to Elasticsearch in _bulk API requests. " +
+ "Large files that exceed the batch size are automatically split into
multiple _bulk requests.")
@WritesAttributes({
@WritesAttribute(attribute = "elasticsearch.put.error",
description = "The error message if there is an issue parsing
the FlowFile, sending the parsed document to Elasticsearch or parsing the
Elasticsearch response"),
@@ -95,17 +117,9 @@ import java.util.Set;
})
@SystemResourceConsideration(
resource = SystemResource.MEMORY,
- description = "The Batch of FlowFiles will be stored in memory until
the bulk operation is performed.")
+ description = "At most one batch's worth of data will be buffered in
memory at a time. " +
+ "The maximum memory usage is bounded by the Max Batch Size
property.")
public class PutElasticsearchJson extends AbstractPutElasticsearch {
- static final PropertyDescriptor ID_ATTRIBUTE = new
PropertyDescriptor.Builder()
- .name("Identifier Attribute")
- .description("The name of the FlowFile attribute containing the
identifier for the document. If the Index Operation is \"index\", "
- + "this property may be left empty or evaluate to an empty
value, in which case the document's identifier will be "
- + "auto-generated by Elasticsearch. For all other Index
Operations, the attribute must evaluate to a non-empty value.")
- .required(false)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
- .build();
static final PropertyDescriptor SCRIPT = new PropertyDescriptor.Builder()
.name("Script")
@@ -117,13 +131,14 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
static final PropertyDescriptor SCRIPTED_UPSERT = new
PropertyDescriptor.Builder()
.name("Scripted Upsert")
- .description("Whether to add the scripted_upsert flag to the
Upsert Operation. " +
+ .description("Whether to add the scripted_upsert flag to the
Upsert operation. " +
"If true, forces Elasticsearch to execute the Script
whether or not the document exists, defaults to false. " +
- "If the Upsert Document provided (from FlowFile content)
will be empty, but sure to set the " +
- CLIENT_SERVICE.getDisplayName() + " controller service's "
+ ElasticSearchClientService.SUPPRESS_NULLS.getDisplayName() +
- " to " +
ElasticSearchClientService.NEVER_SUPPRESS.getDisplayName() + " or no \"upsert\"
doc will be, " +
- "included in the request to Elasticsearch and the
operation will not create a new document for the script " +
- "to execute against, resulting in a \"not_found\" error")
+ "If the upsert document will be empty, ensure null
suppression is disabled so that an empty \"upsert\" doc " +
+ "is included in the request — otherwise Elasticsearch will
not create a new document for the script to " +
+ "execute against, resulting in a \"not_found\" error. " +
+ "For Single JSON mode, set the " +
CLIENT_SERVICE.getDisplayName() + " controller service's " +
+ ElasticSearchClientService.SUPPRESS_NULLS.getDisplayName()
+ " to " + ElasticSearchClientService.NEVER_SUPPRESS.getDisplayName() + ". " +
+ "For NDJSON and JSON Array modes, set the Suppress Nulls
property on this processor to Never Suppress.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("false")
@@ -145,39 +160,171 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
.required(true)
.build();
+ static final PropertyDescriptor MAX_BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("Max Batch Size")
+ .description("""
+ The maximum amount of data to send in a single
Elasticsearch _bulk API request. \
+ For NDJSON and JSON Array modes, FlowFiles are accumulated
until this threshold is reached, then flushed. \
+ For Single JSON mode, this acts as a size-based safety
limit: if the accumulated FlowFiles exceed this size \
+ before Max FlowFiles Per Batch is reached, the request is
flushed early. \
+ Elasticsearch recommends 5-15 MB per bulk request for
optimal performance. \
+ To disable the size-based limit, check "Set Empty String".\
+ """)
+ .defaultValue("10 MB")
+ .addValidator((subject, input, context) ->
StringUtils.isBlank(input)
+ ? new
ValidationResult.Builder().subject(subject).valid(true).build()
+ : StandardValidators.DATA_SIZE_VALIDATOR.validate(subject,
input, context))
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .required(false)
+ .build();
+
+ static final PropertyDescriptor INPUT_FORMAT = new
PropertyDescriptor.Builder()
+ .name("Input Content Format")
+ .description("""
+ The format of the JSON content in each FlowFile. \
+ NDJSON: one JSON object per line (newline-delimited). \
+ JSON Array: a top-level JSON array of objects, streamed
element-by-element for memory efficiency. \
+ Single JSON: the entire FlowFile is a single JSON
document.\
+ """)
+ .allowableValues(InputFormat.class)
+ .defaultValue(InputFormat.SINGLE_JSON.getValue())
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(AbstractPutElasticsearch.BATCH_SIZE)
+ .name("Max FlowFiles Per Batch")
+ .displayName("Max FlowFiles Per Batch")
+ .description("""
+ The maximum number of FlowFiles to include in a single
Elasticsearch _bulk API request. \
+ If the accumulated FlowFiles exceed Max Batch Size before
this count is reached, the request will be flushed early.\
+ """)
+ .dependsOn(INPUT_FORMAT, InputFormat.SINGLE_JSON)
+ .build();
+
+ static final PropertyDescriptor SUPPRESS_NULLS = new
PropertyDescriptor.Builder()
+ .name("Suppress Nulls")
+ .description("""
+ When set to Always Suppress, null and empty values are
removed from documents before they are sent to Elasticsearch.
+ This setting applies to NDJSON and JSON Array formats for
Index and Create operations only. \
+ For Single JSON, configure null suppression on the
controller service instead.
+ Performance note: for JSON Array the impact is negligible
since documents are already being parsed. \
+ For NDJSON, each line must be parsed and re-serialized
when suppression is enabled, \
+ which adds overhead compared to the default behaviour of
passing lines through as raw bytes.\
+ """)
+ .allowableValues(ElasticSearchClientService.NEVER_SUPPRESS,
ElasticSearchClientService.ALWAYS_SUPPRESS)
+ .defaultValue(ElasticSearchClientService.NEVER_SUPPRESS)
+ .required(true)
+ .dependsOn(INPUT_FORMAT, InputFormat.NDJSON,
InputFormat.JSON_ARRAY)
+ .build();
+
+ static final PropertyDescriptor ID_ATTRIBUTE = new
PropertyDescriptor.Builder()
+ .name("Identifier Attribute")
+ .description("""
+ The name of the FlowFile attribute containing the
identifier for the document. \
+ If the Index Operation is "index", this property may be
left empty or evaluate to an empty value, \
+ in which case the document's identifier will be
auto-generated by Elasticsearch. \
+ For all other Index Operations, the attribute must
evaluate to a non-empty value.\
+ """)
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+ .dependsOn(INPUT_FORMAT, InputFormat.SINGLE_JSON)
+ .build();
+
+ static final PropertyDescriptor IDENTIFIER_FIELD = new
PropertyDescriptor.Builder()
+ .name("Identifier Field")
+ .description("""
+ The name of the field within each document to use as the
Elasticsearch document ID. \
+ If the field is not present in a document or this property
is left blank, no document ID is set \
+ and Elasticsearch will auto-generate one.\
+ """)
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(INPUT_FORMAT, InputFormat.NDJSON,
InputFormat.JSON_ARRAY)
+ .build();
+
+ static final Relationship REL_BULK_REQUEST = new Relationship.Builder()
+ .name("bulk_request")
+ .description("When \"Output Bulk Request\" is enabled, the raw
Elasticsearch _bulk API request body is written " +
+ "to this relationship as a FlowFile for inspection or
debugging.")
+ .build();
+
+ static final PropertyDescriptor OUTPUT_BULK_REQUEST = new
PropertyDescriptor.Builder()
+ .name("Output Bulk Request")
+ .description("If enabled, each Elasticsearch _bulk request body is
written as a FlowFile to the \"" +
+ REL_BULK_REQUEST.getName() + "\" relationship. Useful for
debugging. " +
+ "Each FlowFile contains the full NDJSON body exactly as
sent to Elasticsearch.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
static final List<PropertyDescriptor> DESCRIPTORS = List.of(
- ID_ATTRIBUTE,
INDEX_OP,
INDEX,
TYPE,
SCRIPT,
SCRIPTED_UPSERT,
DYNAMIC_TEMPLATES,
+ INPUT_FORMAT,
BATCH_SIZE,
+ MAX_BATCH_SIZE,
+ SUPPRESS_NULLS,
+ ID_ATTRIBUTE,
+ IDENTIFIER_FIELD,
CHARSET,
MAX_JSON_FIELD_STRING_LENGTH,
CLIENT_SERVICE,
LOG_ERROR_RESPONSES,
OUTPUT_ERROR_RESPONSES,
+ OUTPUT_BULK_REQUEST,
NOT_FOUND_IS_SUCCESSFUL
);
+
static final Set<Relationship> BASE_RELATIONSHIPS =
Set.of(REL_ORIGINAL, REL_FAILURE, REL_RETRY, REL_SUCCESSFUL,
REL_ERRORS);
+ private static final int READER_BUFFER_SIZE = 65536;
+
+ private final AtomicBoolean bulkRequestOutputEnabled = new
AtomicBoolean(false);
+ private boolean outputBulkRequest;
+ private ObjectReader mapReader;
+ private ObjectWriter suppressingWriter;
+
@Override
Set<Relationship> getBaseRelationships() {
return BASE_RELATIONSHIPS;
}
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> rels = new HashSet<>(super.getRelationships());
+ if (bulkRequestOutputEnabled.get()) {
+ rels.add(REL_BULK_REQUEST);
+ }
+ return rels;
+ }
+
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
+ super.onPropertyModified(descriptor, oldValue, newValue);
+ if (OUTPUT_BULK_REQUEST.equals(descriptor)) {
+ bulkRequestOutputEnabled.set(Boolean.parseBoolean(newValue));
+ }
+ }
+
@Override
public void migrateProperties(final PropertyConfiguration config) {
super.migrateProperties(config);
+ // Migrate legacy property names from PutElasticsearchJson
config.removeProperty("put-es-json-error-documents");
config.renameProperty("put-es-json-id-attr", ID_ATTRIBUTE.getName());
config.renameProperty("put-es-json-script", SCRIPT.getName());
@@ -185,12 +332,31 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
config.renameProperty("put-es-json-dynamic_templates",
DYNAMIC_TEMPLATES.getName());
config.renameProperty("put-es-json-charset", CHARSET.getName());
config.renameProperty("put-es-json-not_found-is-error",
AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName());
+
+ // If INPUT_FORMAT was not explicitly set, this flow was migrated from
PutElasticsearchJson — default to Single JSON.
+ // Set this before migrating BATCH_SIZE so its dependsOn condition is
satisfied when NiFi processes the property.
+ if (!config.hasProperty(INPUT_FORMAT.getName())) {
+ config.setProperty(INPUT_FORMAT.getName(),
InputFormat.SINGLE_JSON.getValue());
+ }
+
+ // Migrate "Batch Size" (from PutElasticsearchJson) to the new name
used in this processor.
+ // INPUT_FORMAT is set first above so its dependsOn condition is
satisfied when NiFi processes BATCH_SIZE.
+ config.renameProperty(AbstractPutElasticsearch.BATCH_SIZE.getName(),
BATCH_SIZE.getName());
+
+ // MAX_BATCH_SIZE is a new property — existing configurations had no
byte-based limit.
+ // Preserve that behavior on upgrade by leaving the property blank
(unbounded).
+ // The custom validator on MAX_BATCH_SIZE accepts blank values, so
this will not be
+ // overwritten by the default of 10 MB. New processor instances get
the 10 MB default.
+ if (!config.hasProperty(MAX_BATCH_SIZE.getName())) {
+ config.setProperty(MAX_BATCH_SIZE.getName(), "");
+ }
}
@Override
public void migrateRelationships(final RelationshipConfiguration config) {
super.migrateRelationships(config);
+ // PutElasticsearchJson used "success" before it was renamed to
"original"
config.renameRelationship("success",
AbstractPutElasticsearch.REL_ORIGINAL.getName());
}
@@ -198,130 +364,637 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
-
this.notFoundIsSuccessful =
context.getProperty(NOT_FOUND_IS_SUCCESSFUL).asBoolean();
+ this.outputBulkRequest =
context.getProperty(OUTPUT_BULK_REQUEST).asBoolean();
+ this.bulkRequestOutputEnabled.set(this.outputBulkRequest);
+ this.mapReader = mapper.readerFor(new TypeReference<Map<String,
Object>>() { });
+ final InputFormat inputFormat =
InputFormat.fromValue(context.getProperty(INPUT_FORMAT).getValue());
+ if (inputFormat != InputFormat.SINGLE_JSON
+ &&
ElasticSearchClientService.ALWAYS_SUPPRESS.getValue().equals(context.getProperty(SUPPRESS_NULLS).getValue()))
{
+ final ObjectMapper suppressingMapper = mapper.copy()
+
.setDefaultPropertyInclusion(JsonInclude.Include.NON_EMPTY);
+ this.suppressingWriter = suppressingMapper.writer();
+ } else {
+ this.suppressingWriter = null;
+ }
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) {
- final int batchSize =
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-
- final List<FlowFile> flowFiles = session.get(batchSize);
- if (flowFiles.isEmpty()) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
return;
}
+ final String maxBatchSizeStr =
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().getValue();
+ final long maxBatchBytes = StringUtils.isNotBlank(maxBatchSizeStr)
+ ?
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue()
+ : Long.MAX_VALUE;
+ final InputFormat inputFormat =
InputFormat.fromValue(context.getProperty(INPUT_FORMAT).getValue());
+ final String idAttribute = inputFormat == InputFormat.SINGLE_JSON
+ ? context.getProperty(ID_ATTRIBUTE).getValue()
+ : null;
+ final String documentIdField = inputFormat != InputFormat.SINGLE_JSON
+ ?
context.getProperty(IDENTIFIER_FIELD).evaluateAttributeExpressions().getValue()
+ : null;
+ final int batchSize = InputFormat.SINGLE_JSON == inputFormat
+ ?
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger()
+ : Integer.MAX_VALUE;
+ int flowFilesProcessed = 0;
+
+ // Tracks all FlowFiles that were successfully parsed and submitted
(even partially)
+ final Set<FlowFile> allProcessedFlowFiles = new LinkedHashSet<>();
+ // Tracks FlowFiles that had at least one Elasticsearch document error
+ final Set<FlowFile> errorFlowFiles = new LinkedHashSet<>();
+ // Deferred bulk-error attributes: applied after each FlowFile's
InputStream is closed
+ final Map<FlowFile, List<Map<String, Object>>> pendingBulkErrors = new
HashMap<>();
+ // Failed local record indices per FlowFile — O(error count) memory;
used at finalization
+ // to re-read FlowFiles once and reconstruct error/success content
without buffering all bytes
+ final Map<FlowFile, Set<Integer>> pendingErrorRecordIndices = new
LinkedHashMap<>();
+
+ // Current chunk accumulation — operationFlowFiles and
operationRecordIndices are parallel to operations (same index)
+ final List<FlowFile> operationFlowFiles = new ArrayList<>();
+ final List<IndexOperationRequest> operations = new ArrayList<>();
+ final List<Integer> operationRecordIndices = new ArrayList<>();
+ long totalBytesAccumulated = 0;
+ long chunkBytes = 0;
+
+ while (flowFile != null) {
+ final String indexOp =
context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
+ final String index =
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
+ final String type =
context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
+ final String charset =
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
+ final String flowFileIdAttribute =
StringUtils.isNotBlank(idAttribute) ? flowFile.getAttribute(idAttribute) : null;
+
+ final Map<String, Object> scriptMap = getMapFromAttribute(SCRIPT,
context, flowFile);
+ final boolean scriptedUpsert =
context.getProperty(SCRIPTED_UPSERT).evaluateAttributeExpressions(flowFile).asBoolean();
+ final Map<String, Object> dynamicTemplatesMap =
getMapFromAttribute(DYNAMIC_TEMPLATES, context, flowFile);
+ final Map<String, String> dynamicProperties =
getRequestParametersFromDynamicProperties(context, flowFile);
+ final Map<String, String> bulkHeaderFields =
getBulkHeaderParameters(dynamicProperties);
+
+ boolean parseError = false;
+ try {
+ final IndexOperationRequest.Operation o =
IndexOperationRequest.Operation.forValue(indexOp);
+ if (InputFormat.NDJSON == inputFormat) {
+ // NDJSON: each non-blank line is one JSON document.
+ // Index/Create operations pass raw UTF-8 bytes directly
to avoid Map allocation.
+ // Update/Delete/Upsert parse into a Map so the bulk
serializer can wrap the payload.
+ int localRecordIndex = 0;
+ try (final BufferedReader reader = new BufferedReader(
+ new InputStreamReader(session.read(flowFile),
charset), READER_BUFFER_SIZE)) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ final String trimmedLine = line.trim();
+ if (trimmedLine.isEmpty()) {
+ continue;
+ }
+ final IndexOperationRequest opRequest;
+ final long docBytes;
+ if (o == IndexOperationRequest.Operation.Index ||
o == IndexOperationRequest.Operation.Create) {
+ final String id = extractId(trimmedLine,
documentIdField, flowFileIdAttribute);
+ final byte[] rawJsonBytes;
+ if (suppressingWriter != null) {
+ // Parse to Map so NON_NULL/NON_EMPTY
inclusion filters apply during serialization.
+ // JsonNode tree serialization bypasses
JsonInclude filters.
+ rawJsonBytes =
suppressingWriter.writeValueAsBytes(mapReader.readValue(trimmedLine));
+ } else {
+ rawJsonBytes =
trimmedLine.getBytes(StandardCharsets.UTF_8);
+ }
+ opRequest = IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .rawJsonBytes(rawJsonBytes)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ docBytes = rawJsonBytes.length;
+ } else {
+ final Map<String, Object> contentMap =
mapReader.readValue(trimmedLine);
+ final String id = resolveId(contentMap,
documentIdField, flowFileIdAttribute);
+ opRequest = IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .fields(contentMap)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ docBytes = trimmedLine.length();
+ }
+ operations.add(opRequest);
+ operationFlowFiles.add(flowFile);
+ operationRecordIndices.add(localRecordIndex++);
+ chunkBytes += docBytes;
+ totalBytesAccumulated += docBytes;
+ if (chunkBytes >= maxBatchBytes) {
+ flushChunk(operations, operationFlowFiles,
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors,
pendingErrorRecordIndices, context, session);
+ operations.clear();
+ operationFlowFiles.clear();
+ operationRecordIndices.clear();
+ chunkBytes = 0;
+ }
+ }
+ }
+ } else if (InputFormat.JSON_ARRAY == inputFormat) {
+ // JSON Array: the FlowFile may contain one or more
top-level JSON arrays.
+ // Arrays are read sequentially; elements within each
array are streamed one at a
+ // time via JsonParser to avoid loading the entire content
into memory.
+ // Each element is re-serialized to compact bytes for
Index/Create,
+ // or parsed into a Map for Update/Delete/Upsert.
+ int localRecordIndex = 0;
+ try (final InputStreamReader isr = new
InputStreamReader(session.read(flowFile), charset);
+ final JsonParser parser =
mapper.getFactory().createParser(isr)) {
+ JsonToken outerToken;
+ while ((outerToken = parser.nextToken()) != null) {
+ if (outerToken != JsonToken.START_ARRAY) {
+ throw new IOException("Expected a JSON array
but found: " + outerToken);
+ }
+ JsonToken token;
+ while ((token = parser.nextToken()) !=
JsonToken.END_ARRAY) {
+ if (token == null) {
+ throw new IOException("Malformed JSON
Array: reached end of stream before the closing ']'. " +
+ "Verify the FlowFile content is a
complete, valid JSON array.");
+ }
+ final long startOffset =
parser.currentTokenLocation().getCharOffset();
+ final IndexOperationRequest opRequest;
+ if (o == IndexOperationRequest.Operation.Index
|| o == IndexOperationRequest.Operation.Create) {
+ final long docBytes;
+ final byte[] rawJsonBytes;
+ final String id;
+ if (suppressingWriter != null) {
+ // Parse directly to Map so
NON_NULL/NON_EMPTY inclusion filters apply during
+ // serialization. JsonNode tree
serialization bypasses JsonInclude filters,
+ // and convertValue(node, Map) adds an
extra serialization cycle.
+ final Map<String, Object> contentMap =
mapReader.readValue(parser);
+ docBytes = Math.max(1,
parser.currentLocation().getCharOffset() - startOffset);
+ rawJsonBytes =
suppressingWriter.writeValueAsBytes(contentMap);
+ id = resolveId(contentMap,
documentIdField, flowFileIdAttribute);
+ } else {
+ final JsonNode node =
mapper.readTree(parser);
+ docBytes = Math.max(1,
parser.currentLocation().getCharOffset() - startOffset);
+ rawJsonBytes =
mapper.writeValueAsBytes(node);
+ id = extractId(node, documentIdField,
flowFileIdAttribute);
+ }
+ opRequest = IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .rawJsonBytes(rawJsonBytes)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ chunkBytes += docBytes;
+ totalBytesAccumulated += docBytes;
+ } else {
+ final Map<String, Object> contentMap =
mapReader.readValue(parser);
+ final long docBytes = Math.max(1,
parser.currentLocation().getCharOffset() - startOffset);
+ final String id = resolveId(contentMap,
documentIdField, flowFileIdAttribute);
+ opRequest = IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .fields(contentMap)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ chunkBytes += docBytes;
+ totalBytesAccumulated += docBytes;
+ }
+ operations.add(opRequest);
+ operationFlowFiles.add(flowFile);
+ operationRecordIndices.add(localRecordIndex++);
+ if (chunkBytes >= maxBatchBytes) {
+ flushChunk(operations, operationFlowFiles,
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors,
pendingErrorRecordIndices, context, session);
+ operations.clear();
+ operationFlowFiles.clear();
+ operationRecordIndices.clear();
+ chunkBytes = 0;
+ }
+ }
+ }
+ }
+ } else {
+ // Single JSON: the entire FlowFile is one document parsed
into a Map.
+ // The client service serializes the Map, preserving
null-suppression settings.
+ try (final InputStream in = session.read(flowFile)) {
+ final Map<String, Object> contentMap =
mapReader.readValue(in);
+ final String id =
StringUtils.isNotBlank(flowFileIdAttribute) ? flowFileIdAttribute : null;
+ final IndexOperationRequest opRequest =
IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .fields(contentMap)
+ .operation(o)
+ .script(scriptMap)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplatesMap)
+ .headerFields(bulkHeaderFields)
+ .build();
+ operations.add(opRequest);
+ operationFlowFiles.add(flowFile);
+ operationRecordIndices.add(0);
+ final long docBytes = flowFile.getSize();
+ chunkBytes += docBytes;
+ totalBytesAccumulated += docBytes;
+ if (chunkBytes >= maxBatchBytes) {
+ flushChunk(operations, operationFlowFiles,
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors,
pendingErrorRecordIndices, context, session);
+ operations.clear();
+ operationFlowFiles.clear();
+ operationRecordIndices.clear();
+ chunkBytes = 0;
+ }
+ }
+ }
+ } catch (final ElasticsearchException ese) {
+ final String msg = String.format("Encountered a server-side
problem with Elasticsearch. %s",
+ ese.isElastic() ? "Routing to retry." : "Routing to
failure.");
+ getLogger().error(msg, ese);
+ final Relationship rel = ese.isElastic() ? REL_RETRY :
REL_FAILURE;
+ // Route only the failing in-flight chunk to retry/failure.
FlowFiles already
+ // successfully indexed by prior _bulk requests are routed
normally to avoid
+ // duplicate indexing if those FlowFiles are re-processed on
retry.
+ final Set<FlowFile> inFlight = new
LinkedHashSet<>(operationFlowFiles);
+ transferFlowFilesOnException(ese, rel, session, true,
inFlight.toArray(new FlowFile[0]));
+ final Set<FlowFile> alreadyIndexed = new
LinkedHashSet<>(allProcessedFlowFiles);
+ alreadyIndexed.removeAll(inFlight);
+ if (!alreadyIndexed.isEmpty()) {
+ handleFinalResponse(context, session, errorFlowFiles,
alreadyIndexed, pendingErrorRecordIndices, inputFormat);
+ }
+ return;
+ } catch (final IOException ioe) {
+ getLogger().error("Could not read FlowFile content as valid
{}.", inputFormat, ioe);
+ removeFlowFileFromChunk(flowFile, operations,
operationFlowFiles, operationRecordIndices);
+ flowFile = session.putAttribute(flowFile,
"elasticsearch.put.error", ioe.getMessage());
+ session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ parseError = true;
+ } catch (final Exception ex) {
+ getLogger().error("Failed processing records.", ex);
+ removeFlowFileFromChunk(flowFile, operations,
operationFlowFiles, operationRecordIndices);
+ flowFile = session.putAttribute(flowFile,
"elasticsearch.put.error", ex.getMessage());
+ session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ parseError = true;
+ }
+
+ if (!parseError) {
+ allProcessedFlowFiles.add(flowFile);
+ // InputStream is now closed — safe to apply any deferred
bulk-error attributes
+ applyPendingBulkErrors(flowFile, pendingBulkErrors, session);
+ }
- final String idAttribute =
context.getProperty(ID_ATTRIBUTE).getValue();
+ flowFilesProcessed++;
- final List<FlowFile> originals = new ArrayList<>(flowFiles.size());
- final List<IndexOperationRequest> operations = new
ArrayList<>(flowFiles.size());
+ // For Single JSON, stop when the count-based batch limit is
reached; otherwise use size limit
+ if (InputFormat.SINGLE_JSON == inputFormat ? flowFilesProcessed >=
batchSize : totalBytesAccumulated >= maxBatchBytes) {
+ break;
+ }
- for (final FlowFile input : flowFiles) {
- addOperation(operations, originals, idAttribute, context, session,
input);
+ flowFile = session.get();
}
- if (!originals.isEmpty()) {
+ // Flush any remaining operations (all InputStreams are closed at this
point)
+ if (!operations.isEmpty()) {
try {
- final List<FlowFile> errorDocuments =
indexDocuments(operations, originals, context, session);
- handleResponse(context, session, errorDocuments, originals);
- session.transfer(originals, REL_ORIGINAL);
+ flushChunk(operations, operationFlowFiles,
operationRecordIndices, errorFlowFiles, null, pendingBulkErrors,
pendingErrorRecordIndices, context, session);
} catch (final ElasticsearchException ese) {
final String msg = String.format("Encountered a server-side
problem with Elasticsearch. %s",
- ese.isElastic() ? "Routing to retry." : "Routing to
failure");
+ ese.isElastic() ? "Routing to retry." : "Routing to
failure.");
getLogger().error(msg, ese);
final Relationship rel = ese.isElastic() ? REL_RETRY :
REL_FAILURE;
- transferFlowFilesOnException(ese, rel, session, true,
originals.toArray(new FlowFile[0]));
- } catch (final JsonProcessingException jpe) {
- getLogger().warn("Could not log Elasticsearch operation errors
nor determine which documents errored.", jpe);
- transferFlowFilesOnException(jpe, REL_ERRORS, session, true,
originals.toArray(new FlowFile[0]));
+ final Set<FlowFile> inFlight = new
LinkedHashSet<>(operationFlowFiles);
+ transferFlowFilesOnException(ese, rel, session, true,
inFlight.toArray(new FlowFile[0]));
+ final Set<FlowFile> alreadyIndexed = new
LinkedHashSet<>(allProcessedFlowFiles);
+ alreadyIndexed.removeAll(inFlight);
+ if (!alreadyIndexed.isEmpty()) {
+ handleFinalResponse(context, session, errorFlowFiles,
alreadyIndexed, pendingErrorRecordIndices, inputFormat);
+ }
+ return;
} catch (final Exception ex) {
getLogger().error("Could not index documents.", ex);
- transferFlowFilesOnException(ex, REL_FAILURE, session, false,
originals.toArray(new FlowFile[0]));
+ final Set<FlowFile> inFlight = new
LinkedHashSet<>(operationFlowFiles);
+ transferFlowFilesOnException(ex, REL_FAILURE, session, false,
inFlight.toArray(new FlowFile[0]));
+ final Set<FlowFile> alreadyIndexed = new
LinkedHashSet<>(allProcessedFlowFiles);
+ alreadyIndexed.removeAll(inFlight);
+ if (!alreadyIndexed.isEmpty()) {
+ handleFinalResponse(context, session, errorFlowFiles,
alreadyIndexed, pendingErrorRecordIndices, inputFormat);
+ }
context.yield();
+ return;
}
+ }
+
+ if (allProcessedFlowFiles.isEmpty()) {
+ getLogger().warn("No records successfully parsed for sending to
Elasticsearch");
} else {
- getLogger().warn("No FlowFiles successfully parsed for sending to
Elasticsearch");
+ handleFinalResponse(context, session, errorFlowFiles,
allProcessedFlowFiles, pendingErrorRecordIndices, inputFormat);
}
}
- private void addOperation(final List<IndexOperationRequest> operations,
final List<FlowFile> originals, final String idAttribute,
- final ProcessContext context, final
ProcessSession session, FlowFile input) {
- final String indexOp =
context.getProperty(INDEX_OP).evaluateAttributeExpressions(input).getValue();
- final String index =
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
- final String type =
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
- final String id = StringUtils.isNotBlank(idAttribute) &&
StringUtils.isNotBlank(input.getAttribute(idAttribute)) ?
input.getAttribute(idAttribute) : null;
-
- final Map<String, Object> scriptMap = getMapFromAttribute(SCRIPT,
context, input);
- final boolean scriptedUpsert =
context.getProperty(SCRIPTED_UPSERT).evaluateAttributeExpressions(input).asBoolean();
- final Map<String, Object> dynamicTemplatesMap =
getMapFromAttribute(DYNAMIC_TEMPLATES, context, input);
-
- final Map<String, String> dynamicProperties =
getRequestParametersFromDynamicProperties(context, input);
- final Map<String, String> bulkHeaderFields =
getBulkHeaderParameters(dynamicProperties);
-
- final String charset =
context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue();
-
- try (final InputStream inStream = session.read(input)) {
- final byte[] result = IOUtils.toByteArray(inStream);
- @SuppressWarnings("unchecked")
- final Map<String, Object> contentMap = mapper.readValue(new
String(result, charset), Map.class);
-
- final IndexOperationRequest.Operation o =
IndexOperationRequest.Operation.forValue(indexOp);
- operations.add(new IndexOperationRequest(index, type, id,
contentMap, o, scriptMap, scriptedUpsert, dynamicTemplatesMap,
bulkHeaderFields));
-
- originals.add(input);
- } catch (final IOException ioe) {
- getLogger().error("Could not read FlowFile content valid JSON.",
ioe);
- input = session.putAttribute(input, "elasticsearch.put.error",
ioe.getMessage());
- session.penalize(input);
- session.transfer(input, REL_FAILURE);
- } catch (final Exception ex) {
- getLogger().error("Could not index documents.", ex);
- input = session.putAttribute(input, "elasticsearch.put.error",
ex.getMessage());
- session.penalize(input);
- session.transfer(input, REL_FAILURE);
+ /**
+ * Removes all entries belonging to {@code target} from the three parallel
lists, so that a
+ * FlowFile that failed mid-read does not leave stale references that
would cause
+ * {@link org.apache.nifi.processor.exception.FlowFileHandlingException}
on the next flush.
+ */
+ private void removeFlowFileFromChunk(final FlowFile target,
+ final List<IndexOperationRequest>
operations,
+ final List<FlowFile>
operationFlowFiles,
+ final List<Integer>
operationRecordIndices) {
+ for (int i = operations.size() - 1; i >= 0; i--) {
+ if (operationFlowFiles.get(i) == target) {
+ operations.remove(i);
+ operationFlowFiles.remove(i);
+ operationRecordIndices.remove(i);
+ }
}
}
- @SuppressWarnings("unchecked")
- private Map<String, Object> getMapFromAttribute(final PropertyDescriptor
propertyDescriptor, final ProcessContext context, final FlowFile input) {
- final String propertyValue =
context.getProperty(propertyDescriptor).evaluateAttributeExpressions(input).getValue();
+ /**
+ * Applies any bulk-error attributes that were deferred for {@code
flowFile} while its
+ * InputStream was open. Must only be called after the InputStream has
been closed.
+ */
+ private void applyPendingBulkErrors(final FlowFile flowFile,
+ final Map<FlowFile, List<Map<String,
Object>>> pendingBulkErrors,
+ final ProcessSession session) {
+ final List<Map<String, Object>> errorList =
pendingBulkErrors.remove(flowFile);
+ if (errorList == null) {
+ return;
+ }
try {
- return StringUtils.isNotBlank(propertyValue) ?
mapper.readValue(propertyValue, Map.class) : Collections.emptyMap();
- } catch (final JsonProcessingException jpe) {
- throw new ProcessException(propertyDescriptor.getDisplayName() + "
must be a String parsable into a JSON Object", jpe);
+ final Object errorObj = errorList.size() == 1 ? errorList.get(0) :
errorList;
+ session.putAttribute(flowFile, "elasticsearch.bulk.error",
mapper.writeValueAsString(errorObj));
+ } catch (final JsonProcessingException e) {
+ session.putAttribute(flowFile, "elasticsearch.bulk.error",
+ String.format("{\"error\": {\"type\":
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
+ e.getMessage().replace("\"", "\\\"")));
}
}
- private List<FlowFile> indexDocuments(final List<IndexOperationRequest>
operations, final List<FlowFile> originals, final ProcessContext context, final
ProcessSession session) throws IOException {
- final Map<String, String> dynamicProperties =
getRequestParametersFromDynamicProperties(context, originals.getFirst());
+ /**
+ * Sends the accumulated batch of operations to Elasticsearch via the
_bulk API and records
+ * any per-document errors. If {@code openStreamFlowFile} is non-null,
error attributes for
+ * that FlowFile are deferred into {@code pendingBulkErrors} so they are
applied after its
+ * InputStream is closed (calling {@code putAttribute} on an open FlowFile
throws
+ * {@link org.apache.nifi.processor.exception.FlowFileHandlingException}).
+ * <p>
+ * Only the local record index of each failed operation is stored (O(error
count) memory).
+ * Error/success content is reconstructed by re-reading the FlowFile in
+ * {@link #handleFinalResponse}.
+ */
+ private void flushChunk(final List<IndexOperationRequest> operations,
final List<FlowFile> operationFlowFiles,
+ final List<Integer> operationRecordIndices,
+ final Set<FlowFile> errorFlowFiles, final
FlowFile openStreamFlowFile,
+ final Map<FlowFile, List<Map<String, Object>>>
pendingBulkErrors,
+ final Map<FlowFile, Set<Integer>>
pendingErrorRecordIndices,
+ final ProcessContext context, final
ProcessSession session) throws IOException {
+ if (outputBulkRequest) {
+ outputBulkRequestFlowFile(operations, operationFlowFiles, session);
+ }
+
+ final FlowFile firstFlowFile = operationFlowFiles.get(0);
+ final Map<String, String> dynamicProperties =
getRequestParametersFromDynamicProperties(context, firstFlowFile);
final IndexOperationResponse response =
clientService.get().bulk(operations,
- new
ElasticsearchRequestOptions(getRequestURLParameters(dynamicProperties),
getRequestHeadersFromDynamicProperties(context, originals.getFirst())));
+ new
ElasticsearchRequestOptions(getRequestURLParameters(dynamicProperties),
getRequestHeadersFromDynamicProperties(context, firstFlowFile)));
final Map<Integer, Map<String, Object>> errors =
findElasticsearchResponseErrors(response);
- final List<FlowFile> errorDocuments = new ArrayList<>(errors.size());
+
+ // Attach per-FlowFile error attributes; defer putAttribute for the
FlowFile whose
+ // InputStream is currently open (putAttribute would throw
FlowFileHandlingException).
+ // Record only the local record index of each failure — content is
reconstructed at finalization.
+ final Map<FlowFile, List<Map<String, Object>>> flowFileErrors = new
HashMap<>();
errors.forEach((index, error) -> {
- String errorMessage;
- try {
- errorMessage = mapper.writeValueAsString(error);
- } catch (JsonProcessingException e) {
- errorMessage = String.format(
- "{\"error\": {\"type\":
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
- e.getMessage().replace("\"", "\\\"")
- );
+ final FlowFile flowFile = operationFlowFiles.get(index);
+ errorFlowFiles.add(flowFile);
+ flowFileErrors.computeIfAbsent(flowFile, k -> new
ArrayList<>()).add(error);
+ pendingErrorRecordIndices.computeIfAbsent(flowFile, k -> new
HashSet<>()).add(operationRecordIndices.get(index));
+ });
+ flowFileErrors.forEach((flowFile, errorList) -> {
+ if (flowFile == openStreamFlowFile) {
+ // Defer: InputStream still open — apply after the
try-with-resources closes
+ pendingBulkErrors.computeIfAbsent(flowFile, k -> new
ArrayList<>()).addAll(errorList);
+ } else {
+ try {
+ final Object errorObj = errorList.size() == 1 ?
errorList.get(0) : errorList;
+ session.putAttribute(flowFile, "elasticsearch.bulk.error",
mapper.writeValueAsString(errorObj));
+ } catch (final JsonProcessingException e) {
+ session.putAttribute(flowFile, "elasticsearch.bulk.error",
+ String.format("{\"error\": {\"type\":
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
+ e.getMessage().replace("\"", "\\\"")));
+ }
}
- errorDocuments.add(session.putAttribute(originals.get(index),
"elasticsearch.bulk.error", errorMessage));
});
if (!errors.isEmpty()) {
- handleElasticsearchDocumentErrors(errors, session, null);
+ handleElasticsearchDocumentErrors(errors, session, firstFlowFile);
}
+ }
+
+ /**
+ * Serializes the pending batch into an Elasticsearch _bulk NDJSON body
and writes it to a
+ * new FlowFile on the {@code bulk_request} relationship for inspection or
debugging.
+ * Each operation produces an action metadata line followed by a document
line
+ * (Delete operations have no document line). Failures here are logged and
swallowed so
+ * that a debug-output failure does not abort the actual indexing.
+ */
+ private void outputBulkRequestFlowFile(final List<IndexOperationRequest>
operations,
+ final List<FlowFile>
operationFlowFiles,
+ final ProcessSession session) {
+ final FlowFile parent = operationFlowFiles.get(0);
+ FlowFile bulkRequestFF = session.create(parent);
+ try (final OutputStream out = session.write(bulkRequestFF)) {
+ for (final IndexOperationRequest op : operations) {
+ // Action metadata line
+ final Map<String, Object> actionBody = new LinkedHashMap<>();
+ if (StringUtils.isNotBlank(op.getIndex())) {
+ actionBody.put("_index", op.getIndex());
+ }
+ if (StringUtils.isNotBlank(op.getType())) {
+ actionBody.put("_type", op.getType());
+ }
+ if (StringUtils.isNotBlank(op.getId())) {
+ actionBody.put("_id", op.getId());
+ }
+ if (op.getDynamicTemplates() != null &&
!op.getDynamicTemplates().isEmpty()) {
+ actionBody.put("dynamic_templates",
op.getDynamicTemplates());
+ }
+ if (op.getHeaderFields() != null) {
+ actionBody.putAll(op.getHeaderFields());
+ }
- return errorDocuments;
+ // Upsert maps to "update" in the ES bulk API
+ final String actionName = op.getOperation() ==
IndexOperationRequest.Operation.Upsert
+ ? "update" : op.getOperation().getValue();
+ final Map<String, Object> actionLine = Map.of(actionName,
actionBody);
+ out.write(mapper.writeValueAsBytes(actionLine));
+ out.write('\n');
+
+ // Document line (delete has no document)
+ if (op.getOperation() !=
IndexOperationRequest.Operation.Delete) {
+ if (op.getRawJsonBytes() != null) {
+ // Index/Create with rawJson path — write directly, no
Map round-trip
+ out.write(op.getRawJsonBytes());
+ } else {
+ final Map<String, Object> docLine = new
LinkedHashMap<>();
+ if (op.getOperation() ==
IndexOperationRequest.Operation.Update
+ || op.getOperation() ==
IndexOperationRequest.Operation.Upsert) {
+ if (op.getScript() != null &&
!op.getScript().isEmpty()) {
+ docLine.put("script", op.getScript());
+ if (op.isScriptedUpsert()) {
+ docLine.put("scripted_upsert", true);
+ }
+ if (op.getFields() != null &&
!op.getFields().isEmpty()) {
+ docLine.put("upsert", op.getFields());
+ }
+ } else {
+ docLine.put("doc", op.getFields());
+ if (op.getOperation() ==
IndexOperationRequest.Operation.Upsert) {
+ docLine.put("doc_as_upsert", true);
+ }
+ }
+ } else {
+ docLine.putAll(op.getFields());
+ }
+ out.write(mapper.writeValueAsBytes(docLine));
+ }
+ out.write('\n');
+ }
+ }
+ } catch (final IOException e) {
+ getLogger().warn("Failed to write bulk request FlowFile for
inspection.", e);
+ session.remove(bulkRequestFF);
+ return;
+ }
+ bulkRequestFF = session.putAttribute(bulkRequestFF,
"elasticsearch.bulk.operation.count", String.valueOf(operations.size()));
+ session.transfer(bulkRequestFF, REL_BULK_REQUEST);
}
- private void handleResponse(final ProcessContext context, final
ProcessSession session, final List<FlowFile> errorDocuments, final
List<FlowFile> originals) {
- // clone FlowFiles to be transferred to errors/successful as the
originals are pass through to REL_ORIGINAL
- final List<FlowFile> copiedErrors =
errorDocuments.stream().map(session::clone).toList();
+ /**
+ * Extracts the document ID from a raw JSON string using a streaming
parser.
+ * Stops as soon as the target field is found, avoiding a full tree parse.
+ * Used for Index/Create operations to avoid the Map allocation overhead.
+ */
+ private String extractId(final String rawJson, final String idAttribute,
final String flowFileIdAttribute) throws IOException {
+ if (StringUtils.isBlank(idAttribute)) {
+ return StringUtils.isNotBlank(flowFileIdAttribute) ?
flowFileIdAttribute : null;
+ }
+ try (final JsonParser p = mapper.getFactory().createParser(rawJson)) {
+ while (p.nextToken() != null) {
+ if (idAttribute.equals(p.currentName()) && p.nextToken() !=
null && !p.currentToken().isStructStart()) {
+ final String value = p.getText();
+ return StringUtils.isNotBlank(value) ? value
+ : (StringUtils.isNotBlank(flowFileIdAttribute) ?
flowFileIdAttribute : null);
+ }
+ }
+ }
+ return StringUtils.isNotBlank(flowFileIdAttribute) ?
flowFileIdAttribute : null;
+ }
+
+ /**
+ * Extracts the document ID from a pre-parsed JsonNode.
+ * Used for JSON Array Index/Create operations where the node is already
available.
+ */
+ private String extractId(final JsonNode node, final String idAttribute,
final String flowFileIdAttribute) {
+ if (StringUtils.isBlank(idAttribute)) {
+ return StringUtils.isNotBlank(flowFileIdAttribute) ?
flowFileIdAttribute : null;
+ }
+ final JsonNode idNode = node.get(idAttribute);
+ if (idNode != null && !idNode.isNull()) {
+ return idNode.asText();
+ }
+ return StringUtils.isNotBlank(flowFileIdAttribute) ?
flowFileIdAttribute : null;
+ }
+
+ /**
+ * Resolves the document ID for Update/Delete/Upsert operations from the
already-parsed content Map.
+ * Falls back to the FlowFile attribute value when the field is absent
from the document.
+ */
+ private String resolveId(final Map<String, Object> contentMap, final
String idAttribute, final String flowFileIdAttribute) {
+ if (StringUtils.isBlank(idAttribute)) {
+ return null;
+ }
+ final Object idObj = contentMap.get(idAttribute);
+ if (idObj != null) {
+ return idObj.toString();
+ }
+ return StringUtils.isNotBlank(flowFileIdAttribute) ?
flowFileIdAttribute : null;
+ }
+
+ /**
+ * Reads a processor property as a JSON Object string and deserializes it
into a Map.
+ * Returns an empty Map when the property is blank. Throws {@link
ProcessException} if the
+ * value is not valid JSON or not a JSON Object.
+ */
+ private Map<String, Object> getMapFromAttribute(final PropertyDescriptor
propertyDescriptor, final ProcessContext context, final FlowFile input) {
+ final String propertyValue =
context.getProperty(propertyDescriptor).evaluateAttributeExpressions(input).getValue();
+ try {
+ return StringUtils.isNotBlank(propertyValue) ?
mapper.readValue(propertyValue, new TypeReference<Map<String, Object>>() { }) :
Collections.emptyMap();
+ } catch (final JsonProcessingException jpe) {
+ throw new ProcessException(propertyDescriptor.getDisplayName() + "
must be a String parsable into a JSON Object", jpe);
+ }
+ }
+
+ /**
+ * Routes all successfully processed FlowFiles to their final
relationships after all
+ * _bulk requests for the trigger have completed.
+ * <ul>
+ * <li>FlowFiles with no errors are cloned to {@code REL_SUCCESSFUL}
without re-reading.</li>
+ * <li>FlowFiles with errors are re-read exactly once to split records
by failed indices:
+ * failed records go to a clone on {@code REL_ERRORS}; successful
records (if any) go
+ * to a clone on {@code REL_SUCCESSFUL}. For SINGLE_JSON the
FlowFile is always a single
+ * document so a direct clone is used instead of re-reading.</li>
+ * <li>All FlowFiles go to {@code REL_ORIGINAL}.</li>
+ * </ul>
+ */
+ private void handleFinalResponse(final ProcessContext context, final
ProcessSession session,
+ final Set<FlowFile> errorFlowFiles, final
Set<FlowFile> allFlowFiles,
+ final Map<FlowFile, Set<Integer>>
pendingErrorRecordIndices,
+ final InputFormat inputFormat) {
+ final List<FlowFile> copiedErrors = new ArrayList<>();
+ final List<FlowFile> successfulDocuments = new ArrayList<>();
+
+ for (final FlowFile ff : allFlowFiles) {
+ if (!errorFlowFiles.contains(ff)) {
+ // All records succeeded: clone the original without re-reading
+ successfulDocuments.add(session.clone(ff));
+ } else if (inputFormat == InputFormat.SINGLE_JSON) {
+ // One document per FlowFile — it failed entirely; clone
directly
+ copiedErrors.add(session.clone(ff));
+ } else {
+ // NDJSON or JSON Array: re-read once and split records into
error/success by index
+ final Set<Integer> failedIndices =
pendingErrorRecordIndices.getOrDefault(ff, Collections.emptySet());
+ final String charset =
context.getProperty(CHARSET).evaluateAttributeExpressions(ff).getValue();
+ final ByteArrayOutputStream errorBaos = new
ByteArrayOutputStream();
+ final ByteArrayOutputStream successBaos = new
ByteArrayOutputStream();
+ session.read(ff, in -> splitRecords(in, failedIndices,
charset, inputFormat, errorBaos, successBaos));
+
+ if (errorBaos.size() > 0) {
+ final byte[] errorBytes = errorBaos.toByteArray();
+ FlowFile errorFf = session.clone(ff);
+ errorFf = session.write(errorFf, out ->
out.write(errorBytes));
+ copiedErrors.add(errorFf);
+ }
+ if (successBaos.size() > 0) {
+ final byte[] successBytes = successBaos.toByteArray();
+ FlowFile successFf = session.clone(ff);
+ successFf = session.write(successFf, out ->
out.write(successBytes));
+ successFf = session.removeAttribute(successFf,
"elasticsearch.bulk.error");
+ successfulDocuments.add(successFf);
+ }
+ }
+ }
+
session.transfer(copiedErrors, REL_ERRORS);
copiedErrors.forEach(e ->
session.getProvenanceReporter().send(
@@ -334,7 +1007,6 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
)
);
- final List<FlowFile> successfulDocuments = originals.stream().filter(f
-> !errorDocuments.contains(f)).map(session::clone).toList();
session.transfer(successfulDocuments, REL_SUCCESSFUL);
successfulDocuments.forEach(s ->
session.getProvenanceReporter().send(
@@ -345,5 +1017,84 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
)
)
);
+
+ session.transfer(allFlowFiles, REL_ORIGINAL);
+ }
+
+ /**
+ * Reads records from {@code in} according to {@code inputFormat} and
writes each record
+ * (as compact single-line JSON followed by {@code '\n'}) to either {@code
errorOut} or
+ * {@code successOut} depending on whether its zero-based index is in
{@code failedIndices}.
+ * <p>
+ * The {@code InputStream} is owned by the NiFi session and must not be
closed here;
+ * {@code BufferedReader} and {@code JsonParser} wrappers are
intentionally not closed so
+ * that they do not propagate a close to the underlying stream.
+ */
+ private void splitRecords(final InputStream in, final Set<Integer>
failedIndices, final String charset,
+ final InputFormat inputFormat, final
OutputStream errorOut,
+ final OutputStream successOut) throws
IOException {
+ // Track whether each output stream has received its first record so
we can write '\n'
+ // between records (not after the last one), avoiding a trailing blank
line.
+ boolean firstError = true;
+ boolean firstSuccess = true;
+ int idx = 0;
+ if (inputFormat == InputFormat.NDJSON) {
+ // BufferedReader not closed intentionally — 'in' is
session-managed
+ final BufferedReader reader = new BufferedReader(new
InputStreamReader(in, charset), READER_BUFFER_SIZE);
+ String line;
+ while ((line = reader.readLine()) != null) {
+ final String trimmed = line.trim();
+ if (trimmed.isEmpty()) {
+ continue;
+ }
+ final byte[] bytes = trimmed.getBytes(StandardCharsets.UTF_8);
+ if (failedIndices.contains(idx)) {
+ if (!firstError) {
+ errorOut.write('\n');
+ }
+ errorOut.write(bytes);
+ firstError = false;
+ } else {
+ if (!firstSuccess) {
+ successOut.write('\n');
+ }
+ successOut.write(bytes);
+ firstSuccess = false;
+ }
+ idx++;
+ }
+ } else {
+ // JSON Array: stream elements one at a time via JsonParser
+ // JsonParser not closed intentionally — 'in' is session-managed
+ final JsonParser parser = mapper.getFactory().createParser(new
InputStreamReader(in, charset));
+ JsonToken outerToken;
+ while ((outerToken = parser.nextToken()) != null) {
+ if (outerToken != JsonToken.START_ARRAY) {
+ continue;
+ }
+ JsonToken token;
+ while ((token = parser.nextToken()) != JsonToken.END_ARRAY) {
+ if (token == null) {
+ break;
+ }
+ final JsonNode node = mapper.readTree(parser);
+ final byte[] bytes = mapper.writeValueAsBytes(node);
+ if (failedIndices.contains(idx)) {
+ if (!firstError) {
+ errorOut.write('\n');
+ }
+ errorOut.write(bytes);
+ firstError = false;
+ } else {
+ if (!firstSuccess) {
+ successOut.write('\n');
+ }
+ successOut.write(bytes);
+ firstSuccess = false;
+ }
+ idx++;
+ }
+ }
+ }
}
}
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index 2a0fd4a318d..62b5865f551 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -516,7 +516,17 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
contentMap.putIfAbsent("@timestamp", atTimestamp);
}
- operationList.add(new IndexOperationRequest(index, type, id,
contentMap, indexOp, script, scriptedUpsert, dynamicTemplates,
bulkHeaderFields));
+ operationList.add(IndexOperationRequest.builder()
+ .index(index)
+ .type(type)
+ .id(id)
+ .fields(contentMap)
+ .operation(indexOp)
+ .script(script)
+ .scriptedUpsert(scriptedUpsert)
+ .dynamicTemplates(dynamicTemplates)
+ .headerFields(bulkHeaderFields)
+ .build());
}
private void operate(final List<IndexOperationRequest> operationList,
final List<Record> processedRecords, final List<Record> originalRecords,
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java
index 37f0249f578..f023920120e 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java
@@ -40,6 +40,7 @@ import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -82,7 +83,7 @@ public class PutElasticsearchJsonTest extends
AbstractPutElasticsearchTest {
super.setup();
runner.setProperty(PutElasticsearchJson.ID_ATTRIBUTE, "doc_id");
- runner.setProperty(AbstractPutElasticsearch.BATCH_SIZE, "1");
+ runner.setProperty(PutElasticsearchJson.BATCH_SIZE, "1");
runner.assertValid();
}
@@ -547,4 +548,337 @@ public class PutElasticsearchJsonTest extends
AbstractPutElasticsearchTest {
final MockFlowFile flowFile =
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILURE).getFirst();
assertTrue(flowFile.getAttribute("elasticsearch.put.error").contains("not"));
}
+
+ //
-------------------------------------------------------------------------
+ // NDJSON format tests
+ //
-------------------------------------------------------------------------
+
+ @Test
+ void testNdjsonFormat() {
+ // Switch to NDJSON mode (one JSON object per line)
+ runner.setProperty(PutElasticsearchJson.INPUT_FORMAT,
InputFormat.NDJSON.getValue());
+ runner.assertValid();
+
+ // Three documents in a single FlowFile
+ final String ndjson = "{\"id\":\"1\",\"msg\":\"hello\"}\n"
+ + "{\"id\":\"2\",\"msg\":\"world\"}\n"
+ + "{\"id\":\"3\",\"msg\":\"foo\"}\n";
+
+ // Capture the operations sent to Elasticsearch
+ final int[] operationCount = {0};
+ clientService.setEvalConsumer(items -> operationCount[0] +=
items.size());
+
+ runner.enqueue(ndjson);
+ runner.run();
+
+ // All three documents indexed in one bulk call → one FlowFile through
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
+ assertEquals(3, operationCount[0], "Expected 3 index operations from
NDJSON FlowFile");
+ }
+
+ @Test
+ void testNdjsonFormatBlankLinesIgnored() {
+ // Blank lines between NDJSON records should be silently skipped
+ runner.setProperty(PutElasticsearchJson.INPUT_FORMAT,
InputFormat.NDJSON.getValue());
+ runner.assertValid();
+
+ final String ndjsonWithBlanks =
"\n{\"id\":\"1\",\"msg\":\"hello\"}\n\n{\"id\":\"2\",\"msg\":\"world\"}\n\n";
+
+ final int[] operationCount = {0};
+ clientService.setEvalConsumer(items -> operationCount[0] +=
items.size());
+
+ runner.enqueue(ndjsonWithBlanks);
+ runner.run();
+
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+ assertEquals(2, operationCount[0], "Expected 2 operations — blank
lines should be ignored");
+ }
+
+ @Test
+ void testNdjsonFormatInvalidLineRoutesToFailure() {
+ // A malformed JSON line should route the whole FlowFile to failure.
+ // IDENTIFIER_FIELD forces JSON parsing per line (via streaming
parser), which catches malformed input.
+ // Without it, lines are passed as raw bytes and validation happens
server-side.
+ runner.setProperty(PutElasticsearchJson.INPUT_FORMAT,
InputFormat.NDJSON.getValue());
+ runner.setProperty(PutElasticsearchJson.IDENTIFIER_FIELD, "id");
+ runner.assertValid();
+
+ runner.enqueue("{\"id\":\"1\"}\nnot-json\n{\"id\":\"3\"}");
+ runner.run();
+
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 0);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
+
+ final MockFlowFile failed =
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_FAILURE).getFirst();
+ assertNotNull(failed.getAttribute("elasticsearch.put.error"),
+ "Failed FlowFile should carry elasticsearch.put.error
attribute");
+ }
+
+ @Test
+ void testNdjsonFormatSizeBatchFlush() {
+ // Set MAX_BATCH_SIZE very small so each document forces its own flush,
+ // resulting in multiple bulk calls for a single FlowFile.
+ runner.setProperty(PutElasticsearchJson.INPUT_FORMAT,
InputFormat.NDJSON.getValue());
+ runner.setProperty(PutElasticsearchJson.MAX_BATCH_SIZE, "1 B");
+ runner.assertValid();
+
+ final String ndjson =
"{\"id\":\"1\"}\n{\"id\":\"2\"}\n{\"id\":\"3\"}\n";
+
+ final int[] bulkCallCount = {0};
+ clientService.setEvalConsumer(items -> bulkCallCount[0]++);
+
+ runner.enqueue(ndjson);
+ runner.run();
+
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+ assertTrue(bulkCallCount[0] > 1, "Expected multiple bulk flushes when
MAX_BATCH_SIZE is tiny");
+ }
+
+ @Test
+ void testNdjsonFormatPartialFailureIsolatesSuccessAndErrorDocs() throws
Exception {
+ // When an NDJSON FlowFile contains records that partially fail, the
successful and failed
+ // documents should be routed to REL_SUCCESSFUL and REL_ERRORS
respectively, each as a
+ // new NDJSON clone containing only those documents.
+ runner.setProperty(PutElasticsearchJson.INPUT_FORMAT,
InputFormat.NDJSON.getValue());
+ runner.setProperty(PutElasticsearchJson.NOT_FOUND_IS_SUCCESSFUL,
"true");
+
clientService.setResponse(IndexOperationResponse.fromJsonResponse(sampleErrorResponse));
+
+ // 7 records in one FlowFile; sampleErrorResponse has errors at
indices 4, 5, 6
+ final String ndjson = "{\"id\":\"1\",\"field2\":\"20\"}\n"
+ + "{\"id\":\"2\",\"field2\":\"20\"}\n"
+ + "{\"id\":\"3\",\"field2\":\"20\"}\n"
+ + "{\"id\":\"4\",\"field2\":\"not_found\"}\n"
+ + "{\"id\":\"5\",\"field2\":\"20abcd\"}\n"
+ + "{\"id\":\"6\",\"field2\":\"213,456.9\"}\n"
+ + "{\"id\":\"7\",\"field2\":\"unit test\"}\n";
+
+ runner.enqueue(ndjson);
+ runner.run();
+
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+
+ // REL_SUCCESSFUL clone contains only the 4 successfully-indexed
documents
+ final MockFlowFile successFf =
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL).getFirst();
+ final String successContent = successFf.getContent();
+ assertTrue(successContent.contains("\"id\":\"1\""), "Successful clone
should include record 1");
+ assertTrue(successContent.contains("\"id\":\"2\""), "Successful clone
should include record 2");
+ assertTrue(successContent.contains("\"id\":\"3\""), "Successful clone
should include record 3");
+ assertTrue(successContent.contains("not_found"), "Successful clone
should include not_found record (treated as success)");
+ assertFalse(successContent.contains("20abcd"), "Successful clone
should not include failed record 5");
+ assertFalse(successContent.contains("213,456.9"), "Successful clone
should not include failed record 6");
+ assertFalse(successContent.contains("unit test"), "Successful clone
should not include failed record 7");
+
+ // REL_ERRORS clone contains only the 3 failed documents
+ final MockFlowFile errorFf =
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_ERRORS).getFirst();
+ final String errorContent = errorFf.getContent();
+ assertTrue(errorContent.contains("20abcd"), "Error clone should
include failed record 5");
+ assertTrue(errorContent.contains("213,456.9"), "Error clone should
include failed record 6");
+ assertTrue(errorContent.contains("unit test"), "Error clone should
include failed record 7");
+ assertFalse(errorContent.contains("\"id\":\"1\""), "Error clone should
not include successful record 1");
+ errorFf.assertAttributeExists("elasticsearch.bulk.error");
+ }
+
+ //
-------------------------------------------------------------------------
+ // JSON Array format tests
+ //
-------------------------------------------------------------------------
+
+ @Test
+ void testJsonArrayFormat() {
+ // Switch to JSON Array mode (top-level array of objects)
+ runner.setProperty(PutElasticsearchJson.INPUT_FORMAT,
InputFormat.JSON_ARRAY.getValue());
+ runner.assertValid();
+
+ final String jsonArray =
"[{\"id\":\"1\",\"msg\":\"hello\"},{\"id\":\"2\",\"msg\":\"world\"},{\"id\":\"3\",\"msg\":\"foo\"}]";
+
+ final int[] operationCount = {0};
+ clientService.setEvalConsumer(items -> operationCount[0] +=
items.size());
+
+ runner.enqueue(jsonArray);
+ runner.run();
+
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
+ assertEquals(3, operationCount[0], "Expected 3 index operations from
JSON Array FlowFile");
+ }
+
+ @Test
+ void testJsonArrayFormatMultipleArraysInOneFlowFile() {
+ // A FlowFile may contain multiple top-level JSON arrays; all should
be indexed
+ runner.setProperty(PutElasticsearchJson.INPUT_FORMAT,
InputFormat.JSON_ARRAY.getValue());
+ runner.assertValid();
+
+ // Two arrays concatenated in one FlowFile
+ final String twoArrays =
"[{\"id\":\"1\"},{\"id\":\"2\"}][{\"id\":\"3\"},{\"id\":\"4\"}]";
+
+ final int[] operationCount = {0};
+ clientService.setEvalConsumer(items -> operationCount[0] +=
items.size());
+
+ runner.enqueue(twoArrays);
+ runner.run();
+
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+ assertEquals(4, operationCount[0], "Expected 4 operations — 2 per
array, 2 arrays");
+ }
+
+ @Test
+ void testJsonArrayFormatMalformedRoutesToFailure() {
+ // A truncated / unclosed array should route to failure with a clear
error message
+ runner.setProperty(PutElasticsearchJson.INPUT_FORMAT,
InputFormat.JSON_ARRAY.getValue());
+ runner.assertValid();
+
+ // Missing closing ']'
+ runner.enqueue("[{\"id\":\"1\"},{\"id\":\"2\"}");
+ runner.run();
+
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 0);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
+
+ final MockFlowFile failed =
runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_FAILURE).getFirst();
+ assertNotNull(failed.getAttribute("elasticsearch.put.error"),
+ "Failed FlowFile should carry elasticsearch.put.error
attribute");
+ }
+
+ @Test
+ void testJsonArrayFormatPrettyPrinted() {
+ // Pretty-printed (multi-line) JSON array — newlines inside the array
must not confuse the parser
+ runner.setProperty(PutElasticsearchJson.INPUT_FORMAT,
InputFormat.JSON_ARRAY.getValue());
+ runner.assertValid();
+
+ final String prettyArray =
+ "[\n"
+ + " { \"id\": \"1\", \"msg\": \"hello\" },\n"
+ + " { \"id\": \"2\", \"msg\": \"world\" },\n"
+ + " { \"id\": \"3\", \"msg\": \"foo\" }\n"
+ + "]\n";
+
+ final int[] operationCount = {0};
+ clientService.setEvalConsumer(items -> operationCount[0] +=
items.size());
+
+ runner.enqueue(prettyArray);
+ runner.run();
+
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+ assertEquals(3, operationCount[0], "Expected 3 operations from
pretty-printed JSON array");
+ }
+
+ @Test
+ void testJsonArrayFormatMultiplePrettyPrintedArrays() {
+ // Multiple pretty-printed arrays in one FlowFile (newlines between
and within arrays)
+ runner.setProperty(PutElasticsearchJson.INPUT_FORMAT,
InputFormat.JSON_ARRAY.getValue());
+ runner.assertValid();
+
+ final String twoArraysPretty =
+ "[\n"
+ + " { \"id\": \"1\" },\n"
+ + " { \"id\": \"2\" }\n"
+ + "]\n"
+ + "[\n"
+ + " { \"id\": \"3\" },\n"
+ + " { \"id\": \"4\" }\n"
+ + "]\n";
+
+ final int[] operationCount = {0};
+ clientService.setEvalConsumer(items -> operationCount[0] +=
items.size());
+
+ runner.enqueue(twoArraysPretty);
+ runner.run();
+
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+ assertEquals(4, operationCount[0], "Expected 4 operations from two
pretty-printed arrays");
+ }
+
+ @Test
+ void testJsonArrayFormatMixedCompactAndPretty() {
+ // One compact array followed by one pretty-printed array in the same
FlowFile
+ runner.setProperty(PutElasticsearchJson.INPUT_FORMAT,
InputFormat.JSON_ARRAY.getValue());
+ runner.assertValid();
+
+ final String mixed =
+ "[{\"id\":\"1\"},{\"id\":\"2\"}]\n"
+ + "[\n"
+ + " { \"id\": \"3\" },\n"
+ + " { \"id\": \"4\" }\n"
+ + "]\n";
+
+ final int[] operationCount = {0};
+ clientService.setEvalConsumer(items -> operationCount[0] +=
items.size());
+
+ runner.enqueue(mixed);
+ runner.run();
+
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+ assertEquals(4, operationCount[0], "Expected 4 operations from mixed
compact + pretty arrays");
+ }
+
+ @Test
+ void testJsonArrayFormatNotAnArrayRoutesToFailure() {
+ // Content that starts with something other than '[' is not a JSON
array
+ runner.setProperty(PutElasticsearchJson.INPUT_FORMAT,
InputFormat.JSON_ARRAY.getValue());
+ runner.assertValid();
+
+ runner.enqueue("{\"id\":\"1\"}");
+ runner.run();
+
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 0);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 0);
+ }
+
+ //
-------------------------------------------------------------------------
+ // OUTPUT_BULK_REQUEST relationship test
+ //
-------------------------------------------------------------------------
+
+ @Test
+ void testOutputBulkRequestEnabled() {
+ // When Output Bulk Request is true, the raw NDJSON body should be
written
+ // to the bulk_request relationship in addition to normal processing.
+ runner.setProperty(PutElasticsearchJson.OUTPUT_BULK_REQUEST, "true");
+ runner.assertValid();
+
+ runner.enqueue(flowFileContents);
+ runner.run();
+
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+ runner.assertTransferCount(PutElasticsearchJson.REL_BULK_REQUEST, 1);
+
+ final MockFlowFile bulkRequest =
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_BULK_REQUEST).getFirst();
+ // The bulk request body must contain the action metadata line and the
document line
+ final String body = bulkRequest.getContent();
+ assertTrue(body.contains("\"index\""), "Bulk request body should
contain action metadata");
+ assertTrue(body.contains("msg"), "Bulk request body should contain
document field from flowFileContents");
+ }
+
+ @Test
+ void testOutputBulkRequestDisabledByDefault() {
+ // The bulk_request relationship should not appear unless explicitly
enabled
+ runner.assertValid();
+
+ runner.enqueue(flowFileContents);
+ runner.run();
+
+ runner.assertTransferCount(PutElasticsearchJson.REL_BULK_REQUEST, 0);
+ }
}
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchProcessorMigration.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchProcessorMigration.java
index f7b55cada78..e1724b7ab7a 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchProcessorMigration.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchProcessorMigration.java
@@ -132,7 +132,8 @@ public class TestElasticSearchProcessorMigration {
"put-es-json-scripted-upsert",
PutElasticsearchJson.SCRIPTED_UPSERT.getName(),
"put-es-json-dynamic_templates",
PutElasticsearchJson.DYNAMIC_TEMPLATES.getName(),
"put-es-json-charset", PutElasticsearchJson.CHARSET.getName(),
- "put-es-json-not_found-is-error",
AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName()
+ "put-es-json-not_found-is-error",
AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName(),
+ AbstractPutElasticsearch.BATCH_SIZE.getName(),
PutElasticsearchJson.BATCH_SIZE.getName()
);
final Map<String, String> expectedRenamed = new
HashMap<>(expectedOwnRenamed);