This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new e99a4259f3 NIFI-11480 add option to group PutElasticsearchRecord 
errors by Elasticsearch _bulk error type
e99a4259f3 is described below

commit e99a4259f3d8abdbe97344384ef7622f99888e4c
Author: Chris Sampson <[email protected]>
AuthorDate: Mon Jun 26 21:38:31 2023 +0100

    NIFI-11480 add option to group PutElasticsearchRecord errors by 
Elasticsearch _bulk error type
    
    Signed-off-by: Matt Burgess <[email protected]>
---
 .../elasticsearch/PutElasticsearchRecord.java      | 229 +++++++++++++++------
 .../elasticsearch/PutElasticsearchJsonTest.groovy  |  41 +++-
 .../PutElasticsearchRecordTest.groovy              |  97 ++++++++-
 .../mock/MockBulkLoadClientService.groovy          |  34 ++-
 .../integration/AbstractElasticsearchITBase.java   |   2 +-
 nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml |   2 +-
 6 files changed, 326 insertions(+), 79 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index 6801d4bff4..1f81aab2d0 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -36,6 +36,7 @@ import org.apache.nifi.elasticsearch.IndexOperationRequest;
 import org.apache.nifi.elasticsearch.IndexOperationResponse;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
@@ -59,6 +60,7 @@ import org.apache.nifi.serialization.record.PushBackRecordSet;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.util.StopWatch;
@@ -79,6 +81,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", 
"elasticsearch7", "elasticsearch8", "put", "index", "record"})
@@ -87,7 +90,8 @@ import java.util.concurrent.atomic.AtomicLong;
         @WritesAttribute(attribute = "elasticsearch.put.error",
                 description = "The error message if there is an issue parsing 
the FlowFile records, sending the parsed documents to Elasticsearch or parsing 
the Elasticsearch response."),
         @WritesAttribute(attribute = "elasticsearch.put.error.count", 
description = "The number of records that generated errors in the Elasticsearch 
_bulk API."),
-        @WritesAttribute(attribute = "elasticsearch.put.success.count", 
description = "The number of records that were successfully processed by the 
Elasticsearch _bulk API.")
+        @WritesAttribute(attribute = "elasticsearch.put.success.count", 
description = "The number of records that were successfully processed by the 
Elasticsearch _bulk API."),
+        @WritesAttribute(attribute = "elasticsearch.bulk.error", description = 
"The _bulk response if there was an error during processing the record within 
Elasticsearch.")
 })
 @DynamicProperties({
         @DynamicProperty(
@@ -267,7 +271,7 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
         .description("If true, \"not_found\" Elasticsearch Document associated 
Records will be routed to the \"" +
                 REL_SUCCESSFUL_RECORDS.getName() + "\" relationship, otherwise 
to the \"" + REL_FAILED_RECORDS.getName() + "\" relationship. " +
                 "If " + OUTPUT_ERROR_RESPONSES.getDisplayName() + " is 
\"true\" then \"not_found\" responses from Elasticsearch " +
-                "will be sent to the " + REL_ERROR_RESPONSES.getName() + " 
relationship")
+                "will be sent to the " + REL_ERROR_RESPONSES.getName() + " 
relationship.")
         .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
         .allowableValues("true", "false")
         .defaultValue("true")
@@ -275,6 +279,21 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
         .dependsOn(RESULT_RECORD_WRITER)
         .build();
 
+    static final PropertyDescriptor GROUP_BULK_ERRORS_BY_TYPE = new 
PropertyDescriptor.Builder()
+            .name("put-es-record-bulk-error-groups")
+            .displayName("Group Results by Bulk Error Type")
+            .description("If this configuration property is set, the response 
from Elasticsearch will be examined for _bulk errors. " +
+                    "The failed records written to the \"" + 
REL_FAILED_RECORDS.getName() + "\" relationship will be grouped by error type " 
+
+                    "and the error related to the first record within the 
FlowFile added to the FlowFile as \"elasticsearch.bulk.error\". " +
+                    "If \"" + NOT_FOUND_IS_SUCCESSFUL.getDisplayName() +"\" is 
\"false\" then records associated with \"not_found\" " +
+                    "Elasticsearch document responses will also be send to the 
\"" + REL_FAILED_RECORDS.getName() + "\" relationship.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .required(false)
+            .dependsOn(RESULT_RECORD_WRITER)
+            .build();
+
     static final PropertyDescriptor DATE_FORMAT = new 
PropertyDescriptor.Builder()
         .name("put-es-record-at-timestamp-date-format")
         .displayName("Date Format")
@@ -316,15 +335,22 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
         INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, CLIENT_SERVICE, RECORD_READER, 
BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD,
         INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH, 
AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD,
         SCRIPT_RECORD_PATH, SCRIPTED_UPSERT_RECORD_PATH, 
DYNAMIC_TEMPLATES_RECORD_PATH, DATE_FORMAT, TIME_FORMAT,
-        TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, OUTPUT_ERROR_RESPONSES, 
RESULT_RECORD_WRITER, NOT_FOUND_IS_SUCCESSFUL
+        TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, OUTPUT_ERROR_RESPONSES, 
RESULT_RECORD_WRITER, NOT_FOUND_IS_SUCCESSFUL,
+        GROUP_BULK_ERRORS_BY_TYPE
     ));
     static final Set<Relationship> BASE_RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
         REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS, 
REL_SUCCESSFUL_RECORDS
     )));
 
+    private static final String OUTPUT_TYPE_SUCCESS = "success";
+    private static final String OUTPUT_TYPE_ERROR = "error";
+    private static final String OUTPUT_TYPE_UNKNOWN_EXCEPTION = 
"unknown_exception";
+    private static final String OUTPUT_TYPE_NOT_FOUND = "not_found";
+
     private RecordPathCache recordPathCache;
     private RecordReaderFactory readerFactory;
     private RecordSetWriterFactory writerFactory;
+    private boolean groupBulkErrors;
 
     private volatile String dateFormat;
     private volatile String timeFormat;
@@ -350,6 +376,7 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
         this.writerFactory = 
context.getProperty(RESULT_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
         this.notFoundIsSuccessful = 
context.getProperty(NOT_FOUND_IS_SUCCESSFUL).asBoolean();
+        this.groupBulkErrors = 
context.getProperty(GROUP_BULK_ERRORS_BY_TYPE).asBoolean();
 
         this.dateFormat = 
context.getProperty(DATE_FORMAT).evaluateAttributeExpressions().getValue();
         if (this.dateFormat == null) {
@@ -482,15 +509,9 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
         final BulkOperation bundle = new BulkOperation(operationList, 
originals, reader.getSchema());
         final ResponseDetails responseDetails = indexDocuments(bundle, 
session, input, requestParameters);
 
-        if (responseDetails.getErrored() != null) {
-            resultRecords.add(responseDetails.getErrored());
-        }
-        erroredRecords.getAndAdd(responseDetails.getErrorCount());
-
-        if (responseDetails.getSucceeded() != null) {
-            resultRecords.add(responseDetails.getSucceeded());
-        }
         successfulRecords.getAndAdd(responseDetails.getSuccessCount());
+        erroredRecords.getAndAdd(responseDetails.getErrorCount());
+        
resultRecords.addAll(responseDetails.getOutputs().values().stream().map(Output::getFlowFile).collect(Collectors.toList()));
 
         operationList.clear();
         originals.clear();
@@ -515,54 +536,73 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
 
         final int numErrors = errors.size();
         final int numSuccessful = response.getItems() == null ? 0 : 
response.getItems().size() - numErrors;
-        FlowFile errorFF = null;
-        FlowFile successFF = null;
+        final Map<String, Output> outputs = new HashMap<>();
 
         if (writerFactory != null) {
             try {
-                successFF = session.create(input);
-                errorFF = session.create(input);
-
-                try (final OutputStream errorOutputStream = 
session.write(errorFF);
-                     final RecordSetWriter errorWriter = 
writerFactory.createWriter(getLogger(), bundle.getSchema(), errorOutputStream, 
errorFF);
-                     final OutputStream successOutputStream = 
session.write(successFF);
-                     final RecordSetWriter successWriter = 
writerFactory.createWriter(getLogger(), bundle.getSchema(), 
successOutputStream, successFF)) {
-
-                    errorWriter.beginRecordSet();
-                    successWriter.beginRecordSet();
-                    for (int o = 0; o < bundle.getOriginalRecords().size(); 
o++) {
-                        if (errors.containsKey(o)) {
-                            
errorWriter.write(bundle.getOriginalRecords().get(o));
+                for (int o = 0; o < bundle.getOriginalRecords().size(); o++) {
+                    final String type;
+                    final Relationship relationship;
+                    final Map<String, Object> error;
+                    if (errors.containsKey(o)) {
+                        relationship = REL_FAILED_RECORDS;
+                        error = errors.get(o);
+                        if (groupBulkErrors) {
+                            if (isElasticsearchNotFound().test(error)) {
+                                type = OUTPUT_TYPE_NOT_FOUND;
+                            } else {
+                                type = getErrorType(error);
+                            }
                         } else {
-                            
successWriter.write(bundle.getOriginalRecords().get(o));
+                            type = OUTPUT_TYPE_ERROR;
                         }
+                    } else {
+                        relationship = REL_SUCCESSFUL_RECORDS;
+                        error = null;
+                        type = OUTPUT_TYPE_SUCCESS;
                     }
-                    errorWriter.finishRecordSet();
-                    successWriter.finishRecordSet();
+                    final Output output = getOutputByType(outputs, type, 
session, relationship, input, bundle.getSchema());
+                    output.write(bundle.getOriginalRecords().get(o), error);
                 }
 
-                if (numErrors > 0) {
-                    errorFF = session.putAttribute(errorFF, ATTR_RECORD_COUNT, 
String.valueOf(numErrors));
-                    session.transfer(errorFF, REL_FAILED_RECORDS);
-                } else {
-                    session.remove(errorFF);
-                }
-
-                if (numSuccessful > 0) {
-                    successFF = session.putAttribute(successFF, 
ATTR_RECORD_COUNT, String.valueOf(numSuccessful));
-                    session.transfer(successFF, REL_SUCCESSFUL_RECORDS);
-                } else {
-                    session.remove(successFF);
+                for (final Output output : outputs.values()) {
+                    output.transfer(session);
                 }
             } catch (final IOException | SchemaNotFoundException ex) {
                 getLogger().error("Unable to write error/successful records", 
ex);
-                session.remove(errorFF);
-                session.remove(successFF);
+                outputs.values().forEach(o -> {
+                    try {
+                        o.remove(session);
+                    } catch (IOException ioe) {
+                        getLogger().warn("Error closing RecordSetWriter for 
FlowFile", ioe);
+                    }
+                });
                 throw ex;
             }
         }
 
-        return new ResponseDetails(successFF, numSuccessful, errorFF, 
numErrors);
+        return new ResponseDetails(outputs, numSuccessful, numErrors);
+    }
+
+    @SuppressWarnings("unchecked")
+    private String getErrorType(final Map<String, Object> errorInner) {
+        try {
+            return (String) ((Map<String, Object>) 
errorInner.get("error")).get("type");
+        } catch (Exception ex) {
+            return OUTPUT_TYPE_UNKNOWN_EXCEPTION;
+        }
+    }
+
+    private Output getOutputByType(final Map<String, Output> outputs, final 
String type, final ProcessSession session,
+                                   final Relationship relationship, final 
FlowFile input, final RecordSchema schema)
+            throws IOException, SchemaNotFoundException{
+        Output output = outputs.get(type);
+        if (output == null) {
+            output = new Output(session, writerFactory, getLogger(), schema, 
input, relationship,
+                    !OUTPUT_TYPE_ERROR.equals(type) && 
!OUTPUT_TYPE_SUCCESS.equals(type) ? type : null);
+            outputs.put(type, output);
+        }
+        return output;
     }
 
     private void formatDateTimeFields(final Map<String, Object> contentMap, 
final Record record) {
@@ -706,6 +746,30 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
         }
     }
 
+    private static class ResponseDetails {
+        final Map<String, Output> outputs;
+        final int errorCount;
+        final int successCount;
+
+        ResponseDetails(final Map<String, Output> outputs, final int 
successCount, final int errorCount) {
+            this.outputs = outputs;
+            this.successCount = successCount;
+            this.errorCount = errorCount;
+        }
+
+        public Map<String, Output> getOutputs() {
+            return outputs;
+        }
+
+        public int getErrorCount() {
+            return errorCount;
+        }
+
+        public int getSuccessCount() {
+            return successCount;
+        }
+    }
+
     private String determineDateFormat(final RecordFieldType recordFieldType) {
         final String format;
         switch (recordFieldType) {
@@ -730,33 +794,74 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
                 : stringValue;
     }
 
-    private static class ResponseDetails {
-        final FlowFile errored;
-        final FlowFile succeeded;
-        final int errorCount;
-        final int successCount;
+    private static class Output {
+        private FlowFile flowFile;
+        private final RecordSetWriter writer;
+        private final Relationship relationship;
+        private final String errorType;
 
-        ResponseDetails(final FlowFile succeeded, final int successCount, 
final FlowFile errored, final int errorCount) {
-            this.succeeded = succeeded;
-            this.successCount = successCount;
-            this.errored = errored;
-            this.errorCount = errorCount;
+        private String exampleError;
+
+        private int numRecords;
+
+        public Output(final ProcessSession session, final 
RecordSetWriterFactory writerFactory, final ComponentLog logger,
+                      final RecordSchema schema, final FlowFile input, final 
Relationship relationship, final String errorType)
+                throws IOException, SchemaNotFoundException {
+            this.flowFile = session.create(input);
+            this.relationship = relationship;
+            this.errorType = errorType;
+
+            final OutputStream outputStream = session.write(this.flowFile);
+            this.writer = writerFactory.createWriter(logger, schema, 
outputStream, this.flowFile);
+            this.writer.beginRecordSet();
+        }
+
+        public FlowFile getFlowFile() {
+            return flowFile;
         }
 
-        public FlowFile getSucceeded() {
-            return succeeded;
+        public void write(final Record record, final Map<String, Object> 
error) throws IOException {
+            numRecords++;
+            this.writer.write(record);
+            if (errorType != null && exampleError == null && error != null) {
+                try {
+                    exampleError = MAPPER.writeValueAsString(error);
+                } catch (JsonProcessingException e) {
+                    exampleError = String.format(
+                            "{\"error\": {\"type\": 
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
+                            e.getMessage().replace("\"", "\\\"")
+                    );
+                }
+            }
         }
 
-        public FlowFile getErrored() {
-            return errored;
+        private void close() throws IOException {
+            if (this.writer != null) {
+                this.writer.close();
+            }
         }
 
-        public int getErrorCount() {
-            return errorCount;
+        public void transfer(final ProcessSession session) throws IOException {
+            writer.finishRecordSet();
+            this.close();
+
+            if (numRecords > 0) {
+                final Map<String, String> attributes = new HashMap<>(2, 1);
+                attributes.put(ATTR_RECORD_COUNT, String.valueOf(numRecords));
+                if (errorType != null) {
+                    attributes.put("elasticsearch.bulk.error", exampleError);
+                }
+                this.flowFile = session.putAllAttributes(this.flowFile, 
attributes);
+
+                session.transfer(this.flowFile, this.relationship);
+            } else {
+                session.remove(this.flowFile);
+            }
         }
 
-        public int getSuccessCount() {
-            return successCount;
+        public void remove(final ProcessSession session) throws IOException {
+            this.close();
+            session.remove(this.flowFile);
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
index 6dfc68d77f..13fe7a6d0b 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
@@ -316,8 +316,10 @@ class PutElasticsearchJsonTest extends 
AbstractPutElasticsearchTest<PutElasticse
                 [ id: "1", field1: 'value1', field2: '20' ],
                 [ id: "2", field1: 'value1', field2: '20' ],
                 [ id: "2", field1: 'value1', field2: '20' ],
-                [ id: "3", field1: 'value1', field2: 'not_found' ],
-                [ id: "4", field1: 'value1', field2: '20abcd' ]
+                [ id: "4", field1: 'value1', field2: 'not_found' ],
+                [ id: "5", field1: 'value1', field2: '20abcd' ],
+                [ id: "6", field1: 'value1', field2: '213,456.9' ],
+                [ id: "7", field1: 'value1', field2: 'unit test' ]
         ]
 
         for (final def val : values) {
@@ -329,7 +331,7 @@ class PutElasticsearchJsonTest extends 
AbstractPutElasticsearchTest<PutElasticse
         runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 4)
         runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
-        runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 
1)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 
3)
         runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
 
         def failedDoc = 
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0]
@@ -337,7 +339,20 @@ class PutElasticsearchJsonTest extends 
AbstractPutElasticsearchTest<PutElasticse
         failedDoc.assertAttributeExists("elasticsearch.bulk.error")
         failedDoc.assertAttributeNotExists("elasticsearch.put.error")
         assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"), 
containsString("mapper_parsing_exception"))
-        assertEquals(1,
+
+        failedDoc = 
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[1]
+        assertThat(failedDoc.getContent(), containsString("213,456.9"))
+        failedDoc.assertAttributeExists("elasticsearch.bulk.error")
+        failedDoc.assertAttributeNotExists("elasticsearch.put.error")
+        assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"), 
containsString("mapper_parsing_exception"))
+
+        failedDoc = 
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[2]
+        assertThat(failedDoc.getContent(), containsString("unit test"))
+        failedDoc.assertAttributeExists("elasticsearch.bulk.error")
+        failedDoc.assertAttributeNotExists("elasticsearch.put.error")
+        assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"), 
containsString("some_other_exception"))
+
+        assertEquals(3,
                 runner.getProvenanceEvents().stream().filter({
                     e -> ProvenanceEventType.SEND == e.getEventType() && 
"Elasticsearch _bulk operation error" == e.getDetails()
                 }).count()
@@ -359,7 +374,7 @@ class PutElasticsearchJsonTest extends 
AbstractPutElasticsearchTest<PutElasticse
         runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 3)
         runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
-        runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 
2)
+        runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 
4)
         runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
1)
 
         failedDoc = 
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0]
@@ -374,11 +389,25 @@ class PutElasticsearchJsonTest extends 
AbstractPutElasticsearchTest<PutElasticse
         failedDoc.assertAttributeNotExists("elasticsearch.put.error")
         assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"), 
containsString("number_format_exception"))
 
+        failedDoc = 
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[2]
+        assertThat(failedDoc.getContent(), containsString("213,456.9"))
+        failedDoc.assertAttributeExists("elasticsearch.bulk.error")
+        failedDoc.assertAttributeNotExists("elasticsearch.put.error")
+        assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"), 
containsString("mapper_parsing_exception"))
+
+        failedDoc = 
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[3]
+        assertThat(failedDoc.getContent(), containsString("unit test"))
+        failedDoc.assertAttributeExists("elasticsearch.bulk.error")
+        failedDoc.assertAttributeNotExists("elasticsearch.put.error")
+        assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"), 
containsString("some_other_exception"))
+
         final String errorResponses = 
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_ERROR_RESPONSES)[0].getContent()
         assertThat(errorResponses, containsString("not_found"))
         assertThat(errorResponses, containsString("For input string: 20abc"))
+        assertThat(errorResponses, containsString("For input string: 
213,456.9"))
+        assertThat(errorResponses, containsString("For input string: unit 
test"))
 
-        assertEquals(2,
+        assertEquals(4,
                 runner.getProvenanceEvents().stream().filter({
                     e -> ProvenanceEventType.SEND == e.getEventType() && 
"Elasticsearch _bulk operation error" == e.getDetails()
                 }).count()
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
index f3992d79da..5378a8a43f 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -868,21 +868,24 @@ class PutElasticsearchRecordTest extends 
AbstractPutElasticsearchTest<PutElastic
                 [ name: "field2", type: "string"]
             ]
         ]))
+        registry.addSchema("errorTest", AvroTypeUtil.createSchema(new 
Schema.Parser().parse(newSchema)))
 
         def values = [
             [ id: "1", field1: 'value1', field2: '20' ],
             [ id: "2", field1: 'value1', field2: '20' ],
             [ id: "2", field1: 'value1', field2: '20' ],
-            [ id: "3", field1: 'value1', field2: 'not_found' ],
-            [ id: "4", field1: 'value1', field2: '20abcd' ]
+            [ id: "4", field1: 'value1', field2: 'not_found' ],
+            [ id: "5", field1: 'value1', field2: '20abcd' ],
+            [ id: "6", field1: 'value1', field2: '213,456.9' ],
+            [ id: "7", field1: 'value1', field2: 'unit test' ]
         ]
 
         clientService.response = 
IndexOperationResponse.fromJsonResponse(MockBulkLoadClientService.SAMPLE_ERROR_RESPONSE)
 
-        registry.addSchema("errorTest", AvroTypeUtil.createSchema(new 
Schema.Parser().parse(newSchema)))
-        runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name': 
'errorTest' ])
+        // failed records output
         runner.setProperty(PutElasticsearchRecord.LOG_ERROR_RESPONSES, "true")
         runner.assertValid()
+        runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name': 
'errorTest' ])
         runner.run()
 
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
@@ -893,21 +896,57 @@ class PutElasticsearchRecordTest extends 
AbstractPutElasticsearchTest<PutElastic
         runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
 
         
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
+                
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "3")
+        
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS)[0]
+                
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "4")
+
+        assertEquals(1,
+                runner.getProvenanceEvents().stream().filter({
+                    e -> ProvenanceEventType.SEND == e.getEventType() && 
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [3 error(s), 4 
success(es)]"
+                }).count()
+        )
+
+
+        // failed record output grouped by Elasticsearch _bulk error.type
+        runner.clearTransferState()
+        runner.clearProvenanceEvents()
+
+        runner.setProperty(PutElasticsearchRecord.GROUP_BULK_ERRORS_BY_TYPE, 
"true")
+        runner.assertValid()
+        runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name': 
'errorTest' ])
+        runner.run()
+
+        runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
2)
+        
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
+
+        
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
+                
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "2")
+        
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
+                .getAttribute("elasticsearch.bulk.error"), 
containsString("mapper_parsing_exception"))
+        
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[1]
                 
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "1")
+        
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[1]
+                .getAttribute("elasticsearch.bulk.error"), 
containsString("some_other_exception"))
         
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS)[0]
                 
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "4")
 
         assertEquals(1,
                 runner.getProvenanceEvents().stream().filter({
-                    e -> ProvenanceEventType.SEND == e.getEventType() && 
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [1 error(s), 4 
success(es)]"
+                    e -> ProvenanceEventType.SEND == e.getEventType() && 
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [3 error(s), 4 
success(es)]"
                 }).count()
         )
 
 
+        // not_found responses treated as failed records
         runner.clearTransferState()
         runner.clearProvenanceEvents()
 
         runner.setProperty(PutElasticsearchRecord.NOT_FOUND_IS_SUCCESSFUL, 
"false")
+        runner.setProperty(PutElasticsearchRecord.GROUP_BULK_ERRORS_BY_TYPE, 
"false")
         runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name': 
'errorTest' ])
         runner.assertValid()
         runner.run()
@@ -920,23 +959,63 @@ class PutElasticsearchRecordTest extends 
AbstractPutElasticsearchTest<PutElastic
         runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
 
         
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
-                
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "2")
+                
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "4")
         
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS)[0]
                 
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "3")
 
         assertEquals(1,
                 runner.getProvenanceEvents().stream().filter({
-                    e -> ProvenanceEventType.SEND == e.getEventType() && 
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [2 error(s), 3 
success(es)]"
+                    e -> ProvenanceEventType.SEND == e.getEventType() && 
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [4 error(s), 3 
success(es)]"
                 }).count()
         )
 
 
+        // not_found failed records grouped as an error.type
         runner.clearTransferState()
         runner.clearProvenanceEvents()
 
+        runner.setProperty(PutElasticsearchRecord.NOT_FOUND_IS_SUCCESSFUL, 
"false")
+        runner.setProperty(PutElasticsearchRecord.GROUP_BULK_ERRORS_BY_TYPE, 
"true")
+        runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name': 
'errorTest' ])
+        runner.assertValid()
+        runner.run()
+
+        runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
3)
+        
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
+
+        
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
+                
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "2")
+        
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
+                .getAttribute("elasticsearch.bulk.error"), 
containsString("mapper_parsing_exception"))
+        
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[1]
+                
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "1")
+        
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[1]
+                .getAttribute("elasticsearch.bulk.error"), 
containsString("some_other_exception"))
+        
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[2]
+                
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "1")
+        
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[2]
+                .getAttribute("elasticsearch.bulk.error"), 
containsString("not_found"))
+        
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS)[0]
+                
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "3")
+
+        assertEquals(1,
+                runner.getProvenanceEvents().stream().filter({
+                    e -> ProvenanceEventType.SEND == e.getEventType() && 
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [4 error(s), 3 
success(es)]"
+                }).count()
+        )
+
+
         // errors still counted/logged even if not outputting to the error 
relationship
+        runner.clearTransferState()
+        runner.clearProvenanceEvents()
+
         runner.removeProperty(PutElasticsearchRecord.RESULT_RECORD_WRITER)
         runner.setProperty(PutElasticsearchRecord.OUTPUT_ERROR_RESPONSES, 
"true")
+        runner.setProperty(PutElasticsearchRecord.GROUP_BULK_ERRORS_BY_TYPE, 
"false")
         runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name': 
'errorTest' ])
         runner.assertValid()
         runner.run()
@@ -951,10 +1030,12 @@ class PutElasticsearchRecordTest extends 
AbstractPutElasticsearchTest<PutElastic
         final String errorResponses = 
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_ERROR_RESPONSES)[0].getContent()
         assertThat(errorResponses, containsString("not_found"))
         assertThat(errorResponses, containsString("For input string: 20abc"))
+        assertThat(errorResponses, containsString("For input string: 
213,456.9"))
+        assertThat(errorResponses, containsString("For input string: unit 
test"))
 
         assertEquals(1,
                 runner.getProvenanceEvents().stream().filter({
-                    e -> ProvenanceEventType.SEND == e.getEventType() && 
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [2 error(s), 3 
success(es)]"
+                    e -> ProvenanceEventType.SEND == e.getEventType() && 
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [4 error(s), 3 
success(es)]"
                 }).count()
         )
     }
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
index 0996340448..41b36cda61 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
@@ -126,13 +126,45 @@ class MockBulkLoadClientService extends 
AbstractMockElasticsearchClient {
         "status" : 400,
         "error" : {
           "type" : "mapper_parsing_exception",
-          "reason" : "failed to parse field [field2] of type [integer] in 
document with id '4'",
+          "reason" : "failed to parse field [field2] of type [integer] in 
document with id '5'",
           "caused_by" : {
             "type" : "number_format_exception",
             "reason" : "For input string: 20abc"
           }
         }
       }
+    },
+    {
+      "index" : {
+        "_index" : "test",
+        "_type" : "_doc",
+        "_id" : "6",
+        "status" : 400,
+        "error" : {
+          "type" : "mapper_parsing_exception",
+          "reason" : "failed to parse field [field3] of type [geopoint] in 
document with id '6'",
+          "caused_by" : {
+            "type" : "number_format_exception",
+            "reason" : "For input string: 213,456.9"
+          }
+        }
+      }
+    },
+    {
+      "index" : {
+        "_index" : "test",
+        "_type" : "_doc",
+        "_id" : "7",
+        "status" : 400,
+        "error" : {
+          "type" : "some_other_exception",
+          "reason" : "failed to index document with id '7' due to some other 
reason",
+          "caused_by" : {
+            "type" : "random_exception",
+            "reason" : "For input string: unit test"
+          }
+        }
+      }
     }
   ]
 }
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
index 3b23e3c729..9c77306d0c 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
@@ -51,7 +51,7 @@ import static org.apache.http.auth.AuthScope.ANY;
 public abstract class AbstractElasticsearchITBase {
     // default Elasticsearch version should (ideally) match that in the 
nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
     protected static final DockerImageName IMAGE = DockerImageName
-            .parse(System.getProperty("elasticsearch.docker.image", 
"docker.elastic.co/elasticsearch/elasticsearch:8.7.1"));
+            .parse(System.getProperty("elasticsearch.docker.image", 
"docker.elastic.co/elasticsearch/elasticsearch:8.8.1"));
     protected static final String ELASTIC_USER_PASSWORD = 
System.getProperty("elasticsearch.elastic_user.password", 
RandomStringUtils.randomAlphanumeric(10, 20));
     private static final int PORT = 9200;
     protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = 
new ElasticsearchContainer(IMAGE)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
index 93f94542d8..a0796ddd42 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -103,7 +103,7 @@ language governing permissions and limitations under the 
License. -->
             </activation>
             <properties>
                 <!-- also update the default Elasticsearch version in 
nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
-                <elasticsearch_docker_image>8.7.1</elasticsearch_docker_image>
+                <elasticsearch_docker_image>8.8.1</elasticsearch_docker_image>
                 
<elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
             </properties>
             <build>


Reply via email to