agturley commented on code in PR #10981:
URL: https://github.com/apache/nifi/pull/10981#discussion_r3036428972


##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -153,175 +222,705 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
             SCRIPT,
             SCRIPTED_UPSERT,
             DYNAMIC_TEMPLATES,
+            INPUT_FORMAT,
             BATCH_SIZE,
+            MAX_BATCH_SIZE,
             CHARSET,
             MAX_JSON_FIELD_STRING_LENGTH,
             CLIENT_SERVICE,
             LOG_ERROR_RESPONSES,
             OUTPUT_ERROR_RESPONSES,
+            OUTPUT_BULK_REQUEST,
             NOT_FOUND_IS_SUCCESSFUL
     );
+
     static final Set<Relationship> BASE_RELATIONSHIPS =
             Set.of(REL_ORIGINAL, REL_FAILURE, REL_RETRY, REL_SUCCESSFUL, 
REL_ERRORS);
 
+    private static final int READER_BUFFER_SIZE = 65536;
+
+    private final AtomicBoolean bulkRequestOutputEnabled = new 
AtomicBoolean(false);
+    private boolean outputBulkRequest;
+    private ObjectReader mapReader;
+
     @Override
     Set<Relationship> getBaseRelationships() {
         return BASE_RELATIONSHIPS;
     }
 
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> rels = new HashSet<>(super.getRelationships());
+        if (bulkRequestOutputEnabled.get()) {
+            rels.add(REL_BULK_REQUEST);
+        }
+        return rels;
+    }
+
     @Override
     public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return DESCRIPTORS;
     }
 
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        super.onPropertyModified(descriptor, oldValue, newValue);
+        if (OUTPUT_BULK_REQUEST.equals(descriptor)) {
+            bulkRequestOutputEnabled.set(Boolean.parseBoolean(newValue));
+        }
+    }
+
     @Override
     public void migrateProperties(final PropertyConfiguration config) {
         super.migrateProperties(config);
 
+        // Migrate legacy property names from PutElasticsearchJson
         config.removeProperty("put-es-json-error-documents");
         config.renameProperty("put-es-json-id-attr", ID_ATTRIBUTE.getName());
         config.renameProperty("put-es-json-script", SCRIPT.getName());
         config.renameProperty("put-es-json-scripted-upsert", 
SCRIPTED_UPSERT.getName());
         config.renameProperty("put-es-json-dynamic_templates", 
DYNAMIC_TEMPLATES.getName());
         config.renameProperty("put-es-json-charset", CHARSET.getName());
         config.renameProperty("put-es-json-not_found-is-error", 
AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName());
+
+        // Migrate "Batch Size" (from PutElasticsearchJson) to the new name 
used in this processor
+        config.renameProperty(AbstractPutElasticsearch.BATCH_SIZE.getName(), 
BATCH_SIZE.getName());
+
+        // If INPUT_FORMAT was not explicitly set, this flow was migrated from 
PutElasticsearchJson — default to Single JSON
+        if (!config.hasProperty(INPUT_FORMAT.getName())) {
+            config.setProperty(INPUT_FORMAT.getName(), 
InputFormat.SINGLE_JSON.getValue());
+        }
     }
 
     @Override
     public void migrateRelationships(final RelationshipConfiguration config) {
         super.migrateRelationships(config);
 
+        // PutElasticsearchJson used "success" before it was renamed to 
"original"
         config.renameRelationship("success", 
AbstractPutElasticsearch.REL_ORIGINAL.getName());
     }
 
     @Override
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
         super.onScheduled(context);
-
         this.notFoundIsSuccessful = 
context.getProperty(NOT_FOUND_IS_SUCCESSFUL).asBoolean();
+        this.outputBulkRequest = 
context.getProperty(OUTPUT_BULK_REQUEST).asBoolean();
+        this.bulkRequestOutputEnabled.set(this.outputBulkRequest);
+        this.mapReader = mapper.readerFor(new TypeReference<Map<String, 
Object>>() { });
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        final int batchSize = 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-
-        final List<FlowFile> flowFiles = session.get(batchSize);
-        if (flowFiles.isEmpty()) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
             return;
         }
-
+        final long maxBatchBytes = 
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue();
         final String idAttribute = 
context.getProperty(ID_ATTRIBUTE).getValue();
+        final InputFormat inputFormat = 
InputFormat.fromValue(context.getProperty(INPUT_FORMAT).getValue());
+        final int batchSize = InputFormat.SINGLE_JSON == inputFormat
+                ? 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger()
+                : Integer.MAX_VALUE;
+        int flowFilesProcessed = 0;
+
+        // Tracks all FlowFiles that were successfully parsed and submitted 
(even partially)
+        final Set<FlowFile> allProcessedFlowFiles = new LinkedHashSet<>();
+        // Tracks FlowFiles that had at least one Elasticsearch document error
+        final Set<FlowFile> errorFlowFiles = new LinkedHashSet<>();
+        // Deferred bulk-error attributes: applied after each FlowFile's 
InputStream is closed
+        final Map<FlowFile, List<Map<String, Object>>> pendingBulkErrors = new 
HashMap<>();
+        // Failed local record indices per FlowFile — O(error count) memory; 
used at finalization
+        // to re-read FlowFiles once and reconstruct error/success content 
without buffering all bytes
+        final Map<FlowFile, Set<Integer>> pendingErrorRecordIndices = new 
LinkedHashMap<>();
+
+        // Current chunk accumulation — operationFlowFiles and 
operationRecordIndices are parallel to operations (same index)
+        final List<FlowFile> operationFlowFiles = new ArrayList<>();
+        final List<IndexOperationRequest> operations = new ArrayList<>();
+        final List<Integer> operationRecordIndices = new ArrayList<>();
+        long totalBytesAccumulated = 0;
+        long chunkBytes = 0;
 
-        final List<FlowFile> originals = new ArrayList<>(flowFiles.size());
-        final List<IndexOperationRequest> operations = new 
ArrayList<>(flowFiles.size());
+        while (flowFile != null) {
+            final String indexOp = 
context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
+            final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
+            final String type = 
context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
+            final String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
+            final String flowFileIdAttribute = 
StringUtils.isNotBlank(idAttribute) ? flowFile.getAttribute(idAttribute) : null;
 
-        for (final FlowFile input : flowFiles) {
-            addOperation(operations, originals, idAttribute, context, session, 
input);
+            final Map<String, Object> scriptMap = getMapFromAttribute(SCRIPT, 
context, flowFile);
+            final boolean scriptedUpsert = 
context.getProperty(SCRIPTED_UPSERT).evaluateAttributeExpressions(flowFile).asBoolean();
+            final Map<String, Object> dynamicTemplatesMap = 
getMapFromAttribute(DYNAMIC_TEMPLATES, context, flowFile);
+            final Map<String, String> dynamicProperties = 
getRequestParametersFromDynamicProperties(context, flowFile);
+            final Map<String, String> bulkHeaderFields = 
getBulkHeaderParameters(dynamicProperties);
+
+            boolean parseError = false;
+            try {
+                final IndexOperationRequest.Operation o = 
IndexOperationRequest.Operation.forValue(indexOp);
+                if (InputFormat.NDJSON == inputFormat) {
+                    // NDJSON: each non-blank line is one JSON document.
+                    // Index/Create operations pass raw UTF-8 bytes directly 
to avoid Map allocation.
+                    // Update/Delete/Upsert parse into a Map so the bulk 
serializer can wrap the payload.
+                    int localRecordIndex = 0;
+                    try (final BufferedReader reader = new BufferedReader(
+                            new InputStreamReader(session.read(flowFile), 
charset), READER_BUFFER_SIZE)) {
+                        String line;
+                        while ((line = reader.readLine()) != null) {
+                            final String trimmedLine = line.trim();
+                            if (trimmedLine.isEmpty()) {
+                                continue;
+                            }
+                            final IndexOperationRequest opRequest;
+                            final long docBytes;
+                            if (o == IndexOperationRequest.Operation.Index || 
o == IndexOperationRequest.Operation.Create) {
+                                final String id = extractId(trimmedLine, 
idAttribute, flowFileIdAttribute);
+                                final byte[] rawJsonBytes = 
trimmedLine.getBytes(StandardCharsets.UTF_8);
+                                opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .rawJsonBytes(rawJsonBytes)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                docBytes = rawJsonBytes.length;
+                            } else {
+                                final Map<String, Object> contentMap = 
mapReader.readValue(trimmedLine);
+                                final String id = resolveId(contentMap, 
idAttribute, flowFileIdAttribute);
+                                opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .fields(contentMap)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                docBytes = 
trimmedLine.getBytes(StandardCharsets.UTF_8).length;
+                            }
+                            operations.add(opRequest);
+                            operationFlowFiles.add(flowFile);
+                            operationRecordIndices.add(localRecordIndex++);
+                            chunkBytes += docBytes;
+                            totalBytesAccumulated += docBytes;
+                            if (chunkBytes >= maxBatchBytes) {
+                                flushChunk(operations, operationFlowFiles, 
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors, 
pendingErrorRecordIndices, context, session);
+                                operations.clear();
+                                operationFlowFiles.clear();
+                                operationRecordIndices.clear();
+                                chunkBytes = 0;
+                            }
+                        }
+                    }
+                } else if (InputFormat.JSON_ARRAY == inputFormat) {
+                    // JSON Array: the FlowFile may contain one or more 
top-level JSON arrays.
+                    // Arrays are read sequentially; elements within each 
array are streamed one at a
+                    // time via JsonParser to avoid loading the entire content 
into memory.
+                    // Each element is re-serialized to compact bytes for 
Index/Create,
+                    // or parsed into a Map for Update/Delete/Upsert.
+                    int localRecordIndex = 0;
+                    try (final InputStreamReader isr = new 
InputStreamReader(session.read(flowFile), charset);
+                         final JsonParser parser = 
mapper.getFactory().createParser(isr)) {
+                        JsonToken outerToken;
+                        while ((outerToken = parser.nextToken()) != null) {
+                            if (outerToken != JsonToken.START_ARRAY) {
+                                throw new IOException("Expected a JSON array 
but found: " + outerToken);
+                            }
+                            JsonToken token;
+                            while ((token = parser.nextToken()) != 
JsonToken.END_ARRAY) {
+                                if (token == null) {
+                                    throw new IOException("Malformed JSON 
Array: reached end of stream before the closing ']'. " +
+                                            "Verify the FlowFile content is a 
complete, valid JSON array.");
+                                }
+                                final long startOffset = 
parser.currentTokenLocation().getCharOffset();
+                                final IndexOperationRequest opRequest;
+                                if (o == IndexOperationRequest.Operation.Index 
|| o == IndexOperationRequest.Operation.Create) {
+                                    final JsonNode node = 
mapper.readTree(parser);
+                                    final long docBytes = Math.max(1, 
parser.currentLocation().getCharOffset() - startOffset);
+                                    final byte[] rawJsonBytes = 
mapper.writeValueAsBytes(node);
+                                    final String id = extractId(node, 
idAttribute, flowFileIdAttribute);
+                                    opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .rawJsonBytes(rawJsonBytes)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                    chunkBytes += docBytes;
+                                    totalBytesAccumulated += docBytes;
+                                } else {
+                                    final Map<String, Object> contentMap = 
mapReader.readValue(parser);
+                                    final long docBytes = Math.max(1, 
parser.currentLocation().getCharOffset() - startOffset);
+                                    final String id = resolveId(contentMap, 
idAttribute, flowFileIdAttribute);
+                                    opRequest = IndexOperationRequest.builder()
+                                        .index(index)
+                                        .type(type)
+                                        .id(id)
+                                        .fields(contentMap)
+                                        .operation(o)
+                                        .script(scriptMap)
+                                        .scriptedUpsert(scriptedUpsert)
+                                        .dynamicTemplates(dynamicTemplatesMap)
+                                        .headerFields(bulkHeaderFields)
+                                        .build();
+                                    chunkBytes += docBytes;
+                                    totalBytesAccumulated += docBytes;
+                                }
+                                operations.add(opRequest);
+                                operationFlowFiles.add(flowFile);
+                                operationRecordIndices.add(localRecordIndex++);
+                                if (chunkBytes >= maxBatchBytes) {
+                                    flushChunk(operations, operationFlowFiles, 
operationRecordIndices, errorFlowFiles, flowFile, pendingBulkErrors, 
pendingErrorRecordIndices, context, session);
+                                    operations.clear();
+                                    operationFlowFiles.clear();
+                                    operationRecordIndices.clear();
+                                    chunkBytes = 0;
+                                }
+                            }
+                        }
+                    }
+                } else {
+                    // Single JSON: the entire FlowFile is one document.
+                    // For Index/Create, the input is parsed and re-serialized 
to compact JSON to strip
+                    // any embedded newlines from pretty-printed input. The 
Elasticsearch bulk NDJSON
+                    // protocol uses newlines as record delimiters, so 
embedded newlines would corrupt
+                    // the request. Update/Delete/Upsert parse directly into a 
Map.
+                    try (final InputStream in = session.read(flowFile)) {
+                        final IndexOperationRequest opRequest;
+                        if (o == IndexOperationRequest.Operation.Index || o == 
IndexOperationRequest.Operation.Create) {
+                            // Re-serialize via Jackson to produce compact 
single-line JSON.
+                            // Pretty-printed input would contain embedded 
newlines that break the
+                            // Elasticsearch bulk NDJSON protocol (each 
document must be one line).
+                            final JsonNode node = mapper.readTree(new 
InputStreamReader(in, charset));
+                            final byte[] rawJsonBytes = 
mapper.writeValueAsBytes(node);

Review Comment:
   corrected this behavior, also I added a new option for the NDJson and 
Jsonarray modes to optionally suppress nulls (there is a performance penalty, 
but i wanted to keep that ability). Because this does not use the controller 
server the processor itself has the option, which could be confusing for users 
so I put a description in there that hopefully will clear up any confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to