This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new fb502cdf9a NIFI-11480 add option to group PutElasticsearchRecord
errors by Elasticsearch _bulk error type
fb502cdf9a is described below
commit fb502cdf9a1d6fc3abeb9d9abf182a1d8353f35a
Author: Chris Sampson <[email protected]>
AuthorDate: Mon Jun 26 21:38:31 2023 +0100
NIFI-11480 add option to group PutElasticsearchRecord errors by
Elasticsearch _bulk error type
Signed-off-by: Matt Burgess <[email protected]>
This closes #7441
---
.../elasticsearch/PutElasticsearchRecord.java | 229 +++++++++++++++------
.../elasticsearch/PutElasticsearchJsonTest.groovy | 41 +++-
.../PutElasticsearchRecordTest.groovy | 97 ++++++++-
.../mock/MockBulkLoadClientService.groovy | 34 ++-
.../integration/AbstractElasticsearchITBase.java | 2 +-
nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml | 2 +-
6 files changed, 326 insertions(+), 79 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index 6ab1109c12..bde640d8fe 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -36,6 +36,7 @@ import org.apache.nifi.elasticsearch.IndexOperationRequest;
import org.apache.nifi.elasticsearch.IndexOperationResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@@ -59,6 +60,7 @@ import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StopWatch;
@@ -79,6 +81,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6",
"elasticsearch7", "elasticsearch8", "put", "index", "record"})
@@ -87,7 +90,8 @@ import java.util.concurrent.atomic.AtomicLong;
@WritesAttribute(attribute = "elasticsearch.put.error",
description = "The error message if there is an issue parsing
the FlowFile records, sending the parsed documents to Elasticsearch or parsing
the Elasticsearch response."),
@WritesAttribute(attribute = "elasticsearch.put.error.count",
description = "The number of records that generated errors in the Elasticsearch
_bulk API."),
- @WritesAttribute(attribute = "elasticsearch.put.success.count",
description = "The number of records that were successfully processed by the
Elasticsearch _bulk API.")
+ @WritesAttribute(attribute = "elasticsearch.put.success.count",
description = "The number of records that were successfully processed by the
Elasticsearch _bulk API."),
+ @WritesAttribute(attribute = "elasticsearch.bulk.error", description =
"The _bulk response if there was an error during processing the record within
Elasticsearch.")
})
@DynamicProperties({
@DynamicProperty(
@@ -267,7 +271,7 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
.description("If true, \"not_found\" Elasticsearch Document associated
Records will be routed to the \"" +
REL_SUCCESSFUL_RECORDS.getName() + "\" relationship, otherwise
to the \"" + REL_FAILED_RECORDS.getName() + "\" relationship. " +
"If " + OUTPUT_ERROR_RESPONSES.getDisplayName() + " is
\"true\" then \"not_found\" responses from Elasticsearch " +
- "will be sent to the " + REL_ERROR_RESPONSES.getName() + "
relationship")
+ "will be sent to the " + REL_ERROR_RESPONSES.getName() + "
relationship.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("true")
@@ -275,6 +279,21 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
.dependsOn(RESULT_RECORD_WRITER)
.build();
+ static final PropertyDescriptor GROUP_BULK_ERRORS_BY_TYPE = new
PropertyDescriptor.Builder()
+ .name("put-es-record-bulk-error-groups")
+ .displayName("Group Results by Bulk Error Type")
+ .description("If this configuration property is set, the response
from Elasticsearch will be examined for _bulk errors. " +
+ "The failed records written to the \"" +
REL_FAILED_RECORDS.getName() + "\" relationship will be grouped by error type "
+
+ "and the error related to the first record within the
FlowFile added to the FlowFile as \"elasticsearch.bulk.error\". " +
+ "If \"" + NOT_FOUND_IS_SUCCESSFUL.getDisplayName() +"\" is
\"false\" then records associated with \"not_found\" " +
+ "Elasticsearch document responses will also be send to the
\"" + REL_FAILED_RECORDS.getName() + "\" relationship.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .required(false)
+ .dependsOn(RESULT_RECORD_WRITER)
+ .build();
+
static final PropertyDescriptor DATE_FORMAT = new
PropertyDescriptor.Builder()
.name("put-es-record-at-timestamp-date-format")
.displayName("Date Format")
@@ -316,15 +335,22 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, CLIENT_SERVICE, RECORD_READER,
BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD,
INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH,
AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD,
SCRIPT_RECORD_PATH, SCRIPTED_UPSERT_RECORD_PATH,
DYNAMIC_TEMPLATES_RECORD_PATH, DATE_FORMAT, TIME_FORMAT,
- TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, OUTPUT_ERROR_RESPONSES,
RESULT_RECORD_WRITER, NOT_FOUND_IS_SUCCESSFUL
+ TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, OUTPUT_ERROR_RESPONSES,
RESULT_RECORD_WRITER, NOT_FOUND_IS_SUCCESSFUL,
+ GROUP_BULK_ERRORS_BY_TYPE
));
static final Set<Relationship> BASE_RELATIONSHIPS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS,
REL_SUCCESSFUL_RECORDS
)));
+ private static final String OUTPUT_TYPE_SUCCESS = "success";
+ private static final String OUTPUT_TYPE_ERROR = "error";
+ private static final String OUTPUT_TYPE_UNKNOWN_EXCEPTION =
"unknown_exception";
+ private static final String OUTPUT_TYPE_NOT_FOUND = "not_found";
+
private RecordPathCache recordPathCache;
private RecordReaderFactory readerFactory;
private RecordSetWriterFactory writerFactory;
+ private boolean groupBulkErrors;
private volatile String dateFormat;
private volatile String timeFormat;
@@ -350,6 +376,7 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
this.writerFactory =
context.getProperty(RESULT_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
this.notFoundIsSuccessful =
context.getProperty(NOT_FOUND_IS_SUCCESSFUL).asBoolean();
+ this.groupBulkErrors =
context.getProperty(GROUP_BULK_ERRORS_BY_TYPE).asBoolean();
this.dateFormat =
context.getProperty(DATE_FORMAT).evaluateAttributeExpressions().getValue();
if (this.dateFormat == null) {
@@ -482,15 +509,9 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
final BulkOperation bundle = new BulkOperation(operationList,
originals, reader.getSchema());
final ResponseDetails responseDetails = indexDocuments(bundle,
session, input, requestParameters);
- if (responseDetails.getErrored() != null) {
- resultRecords.add(responseDetails.getErrored());
- }
- erroredRecords.getAndAdd(responseDetails.getErrorCount());
-
- if (responseDetails.getSucceeded() != null) {
- resultRecords.add(responseDetails.getSucceeded());
- }
successfulRecords.getAndAdd(responseDetails.getSuccessCount());
+ erroredRecords.getAndAdd(responseDetails.getErrorCount());
+
resultRecords.addAll(responseDetails.getOutputs().values().stream().map(Output::getFlowFile).collect(Collectors.toList()));
operationList.clear();
originals.clear();
@@ -515,54 +536,73 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
final int numErrors = errors.size();
final int numSuccessful = response.getItems() == null ? 0 :
response.getItems().size() - numErrors;
- FlowFile errorFF = null;
- FlowFile successFF = null;
+ final Map<String, Output> outputs = new HashMap<>();
if (writerFactory != null) {
try {
- successFF = session.create(input);
- errorFF = session.create(input);
-
- try (final OutputStream errorOutputStream =
session.write(errorFF);
- final RecordSetWriter errorWriter =
writerFactory.createWriter(getLogger(), bundle.getSchema(), errorOutputStream,
errorFF);
- final OutputStream successOutputStream =
session.write(successFF);
- final RecordSetWriter successWriter =
writerFactory.createWriter(getLogger(), bundle.getSchema(),
successOutputStream, successFF)) {
-
- errorWriter.beginRecordSet();
- successWriter.beginRecordSet();
- for (int o = 0; o < bundle.getOriginalRecords().size();
o++) {
- if (errors.containsKey(o)) {
-
errorWriter.write(bundle.getOriginalRecords().get(o));
+ for (int o = 0; o < bundle.getOriginalRecords().size(); o++) {
+ final String type;
+ final Relationship relationship;
+ final Map<String, Object> error;
+ if (errors.containsKey(o)) {
+ relationship = REL_FAILED_RECORDS;
+ error = errors.get(o);
+ if (groupBulkErrors) {
+ if (isElasticsearchNotFound().test(error)) {
+ type = OUTPUT_TYPE_NOT_FOUND;
+ } else {
+ type = getErrorType(error);
+ }
} else {
-
successWriter.write(bundle.getOriginalRecords().get(o));
+ type = OUTPUT_TYPE_ERROR;
}
+ } else {
+ relationship = REL_SUCCESSFUL_RECORDS;
+ error = null;
+ type = OUTPUT_TYPE_SUCCESS;
}
- errorWriter.finishRecordSet();
- successWriter.finishRecordSet();
+ final Output output = getOutputByType(outputs, type,
session, relationship, input, bundle.getSchema());
+ output.write(bundle.getOriginalRecords().get(o), error);
}
- if (numErrors > 0) {
- errorFF = session.putAttribute(errorFF, ATTR_RECORD_COUNT,
String.valueOf(numErrors));
- session.transfer(errorFF, REL_FAILED_RECORDS);
- } else {
- session.remove(errorFF);
- }
-
- if (numSuccessful > 0) {
- successFF = session.putAttribute(successFF,
ATTR_RECORD_COUNT, String.valueOf(numSuccessful));
- session.transfer(successFF, REL_SUCCESSFUL_RECORDS);
- } else {
- session.remove(successFF);
+ for (final Output output : outputs.values()) {
+ output.transfer(session);
}
} catch (final IOException | SchemaNotFoundException ex) {
getLogger().error("Unable to write error/successful records",
ex);
- session.remove(errorFF);
- session.remove(successFF);
+ outputs.values().forEach(o -> {
+ try {
+ o.remove(session);
+ } catch (IOException ioe) {
+ getLogger().warn("Error closing RecordSetWriter for
FlowFile", ioe);
+ }
+ });
throw ex;
}
}
- return new ResponseDetails(successFF, numSuccessful, errorFF,
numErrors);
+ return new ResponseDetails(outputs, numSuccessful, numErrors);
+ }
+
+ @SuppressWarnings("unchecked")
+ private String getErrorType(final Map<String, Object> errorInner) {
+ try {
+ return (String) ((Map<String, Object>)
errorInner.get("error")).get("type");
+ } catch (Exception ex) {
+ return OUTPUT_TYPE_UNKNOWN_EXCEPTION;
+ }
+ }
+
+ private Output getOutputByType(final Map<String, Output> outputs, final
String type, final ProcessSession session,
+ final Relationship relationship, final
FlowFile input, final RecordSchema schema)
+ throws IOException, SchemaNotFoundException{
+ Output output = outputs.get(type);
+ if (output == null) {
+ output = new Output(session, writerFactory, getLogger(), schema,
input, relationship,
+ !OUTPUT_TYPE_ERROR.equals(type) &&
!OUTPUT_TYPE_SUCCESS.equals(type) ? type : null);
+ outputs.put(type, output);
+ }
+ return output;
}
private void formatDateTimeFields(final Map<String, Object> contentMap,
final Record record) {
@@ -706,6 +746,30 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
}
}
+ private static class ResponseDetails {
+ final Map<String, Output> outputs;
+ final int errorCount;
+ final int successCount;
+
+ ResponseDetails(final Map<String, Output> outputs, final int
successCount, final int errorCount) {
+ this.outputs = outputs;
+ this.successCount = successCount;
+ this.errorCount = errorCount;
+ }
+
+ public Map<String, Output> getOutputs() {
+ return outputs;
+ }
+
+ public int getErrorCount() {
+ return errorCount;
+ }
+
+ public int getSuccessCount() {
+ return successCount;
+ }
+ }
+
private String determineDateFormat(final RecordFieldType recordFieldType) {
final String format;
switch (recordFieldType) {
@@ -730,33 +794,74 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
: stringValue;
}
- private static class ResponseDetails {
- final FlowFile errored;
- final FlowFile succeeded;
- final int errorCount;
- final int successCount;
+ private static class Output {
+ private FlowFile flowFile;
+ private final RecordSetWriter writer;
+ private final Relationship relationship;
+ private final String errorType;
- ResponseDetails(final FlowFile succeeded, final int successCount,
final FlowFile errored, final int errorCount) {
- this.succeeded = succeeded;
- this.successCount = successCount;
- this.errored = errored;
- this.errorCount = errorCount;
+ private String exampleError;
+
+ private int numRecords;
+
+ public Output(final ProcessSession session, final
RecordSetWriterFactory writerFactory, final ComponentLog logger,
+ final RecordSchema schema, final FlowFile input, final
Relationship relationship, final String errorType)
+ throws IOException, SchemaNotFoundException {
+ this.flowFile = session.create(input);
+ this.relationship = relationship;
+ this.errorType = errorType;
+
+ final OutputStream outputStream = session.write(this.flowFile);
+ this.writer = writerFactory.createWriter(logger, schema,
outputStream, this.flowFile);
+ this.writer.beginRecordSet();
+ }
+
+ public FlowFile getFlowFile() {
+ return flowFile;
}
- public FlowFile getSucceeded() {
- return succeeded;
+ public void write(final Record record, final Map<String, Object>
error) throws IOException {
+ numRecords++;
+ this.writer.write(record);
+ if (errorType != null && exampleError == null && error != null) {
+ try {
+ exampleError = MAPPER.writeValueAsString(error);
+ } catch (JsonProcessingException e) {
+ exampleError = String.format(
+ "{\"error\": {\"type\":
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
+ e.getMessage().replace("\"", "\\\"")
+ );
+ }
+ }
}
- public FlowFile getErrored() {
- return errored;
+ private void close() throws IOException {
+ if (this.writer != null) {
+ this.writer.close();
+ }
}
- public int getErrorCount() {
- return errorCount;
+ public void transfer(final ProcessSession session) throws IOException {
+ writer.finishRecordSet();
+ this.close();
+
+ if (numRecords > 0) {
+ final Map<String, String> attributes = new HashMap<>(2, 1);
+ attributes.put(ATTR_RECORD_COUNT, String.valueOf(numRecords));
+ if (errorType != null) {
+ attributes.put("elasticsearch.bulk.error", exampleError);
+ }
+ this.flowFile = session.putAllAttributes(this.flowFile,
attributes);
+
+ session.transfer(this.flowFile, this.relationship);
+ } else {
+ session.remove(this.flowFile);
+ }
}
- public int getSuccessCount() {
- return successCount;
+ public void remove(final ProcessSession session) throws IOException {
+ this.close();
+ session.remove(this.flowFile);
}
}
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
index bcb31dd9bd..a85fa293f7 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
@@ -316,8 +316,10 @@ class PutElasticsearchJsonTest extends
AbstractPutElasticsearchTest<PutElasticse
[ id: "1", field1: 'value1', field2: '20' ],
[ id: "2", field1: 'value1', field2: '20' ],
[ id: "2", field1: 'value1', field2: '20' ],
- [ id: "3", field1: 'value1', field2: 'not_found' ],
- [ id: "4", field1: 'value1', field2: '20abcd' ]
+ [ id: "4", field1: 'value1', field2: 'not_found' ],
+ [ id: "5", field1: 'value1', field2: '20abcd' ],
+ [ id: "6", field1: 'value1', field2: '213,456.9' ],
+ [ id: "7", field1: 'value1', field2: 'unit test' ]
]
for (final def val : values) {
@@ -329,7 +331,7 @@ class PutElasticsearchJsonTest extends
AbstractPutElasticsearchTest<PutElasticse
runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 4)
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
- runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS,
1)
+ runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS,
3)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
def failedDoc =
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0];
@@ -337,7 +339,20 @@ class PutElasticsearchJsonTest extends
AbstractPutElasticsearchTest<PutElasticse
failedDoc.assertAttributeExists("elasticsearch.bulk.error")
failedDoc.assertAttributeNotExists("elasticsearch.put.error")
assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"),
containsString("mapper_parsing_exception"))
- assertEquals(1,
+
+ failedDoc =
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[1]
+ assertThat(failedDoc.getContent(), containsString("213,456.9"))
+ failedDoc.assertAttributeExists("elasticsearch.bulk.error")
+ failedDoc.assertAttributeNotExists("elasticsearch.put.error")
+ assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"),
containsString("mapper_parsing_exception"))
+
+ failedDoc =
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[2]
+ assertThat(failedDoc.getContent(), containsString("unit test"))
+ failedDoc.assertAttributeExists("elasticsearch.bulk.error")
+ failedDoc.assertAttributeNotExists("elasticsearch.put.error")
+ assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"),
containsString("some_other_exception"))
+
+ assertEquals(3,
runner.getProvenanceEvents().stream().filter({
e -> ProvenanceEventType.SEND == e.getEventType() &&
"Elasticsearch _bulk operation error" == e.getDetails()
}).count()
@@ -359,7 +374,7 @@ class PutElasticsearchJsonTest extends
AbstractPutElasticsearchTest<PutElasticse
runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 3)
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
- runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS,
2)
+ runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS,
4)
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
1)
failedDoc =
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0];
@@ -374,11 +389,25 @@ class PutElasticsearchJsonTest extends
AbstractPutElasticsearchTest<PutElasticse
failedDoc.assertAttributeNotExists("elasticsearch.put.error")
assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"),
containsString("number_format_exception"))
+ failedDoc =
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[2]
+ assertThat(failedDoc.getContent(), containsString("213,456.9"))
+ failedDoc.assertAttributeExists("elasticsearch.bulk.error")
+ failedDoc.assertAttributeNotExists("elasticsearch.put.error")
+ assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"),
containsString("mapper_parsing_exception"))
+
+ failedDoc =
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[3]
+ assertThat(failedDoc.getContent(), containsString("unit test"))
+ failedDoc.assertAttributeExists("elasticsearch.bulk.error")
+ failedDoc.assertAttributeNotExists("elasticsearch.put.error")
+ assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"),
containsString("some_other_exception"))
+
final String errorResponses =
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_ERROR_RESPONSES)[0].getContent()
assertThat(errorResponses, containsString("not_found"))
assertThat(errorResponses, containsString("For input string: 20abc"))
+ assertThat(errorResponses, containsString("For input string:
213,456.9"))
+ assertThat(errorResponses, containsString("For input string: unit
test"))
- assertEquals(2,
+ assertEquals(4,
runner.getProvenanceEvents().stream().filter({
e -> ProvenanceEventType.SEND == e.getEventType() &&
"Elasticsearch _bulk operation error" == e.getDetails()
}).count()
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
index f3992d79da..5378a8a43f 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -868,21 +868,24 @@ class PutElasticsearchRecordTest extends
AbstractPutElasticsearchTest<PutElastic
[ name: "field2", type: "string"]
]
]))
+ registry.addSchema("errorTest", AvroTypeUtil.createSchema(new
Schema.Parser().parse(newSchema)))
def values = [
[ id: "1", field1: 'value1', field2: '20' ],
[ id: "2", field1: 'value1', field2: '20' ],
[ id: "2", field1: 'value1', field2: '20' ],
- [ id: "3", field1: 'value1', field2: 'not_found' ],
- [ id: "4", field1: 'value1', field2: '20abcd' ]
+ [ id: "4", field1: 'value1', field2: 'not_found' ],
+ [ id: "5", field1: 'value1', field2: '20abcd' ],
+ [ id: "6", field1: 'value1', field2: '213,456.9' ],
+ [ id: "7", field1: 'value1', field2: 'unit test' ]
]
clientService.response =
IndexOperationResponse.fromJsonResponse(MockBulkLoadClientService.SAMPLE_ERROR_RESPONSE)
- registry.addSchema("errorTest", AvroTypeUtil.createSchema(new
Schema.Parser().parse(newSchema)))
- runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name':
'errorTest' ])
+ // failed records output
runner.setProperty(PutElasticsearchRecord.LOG_ERROR_RESPONSES, "true")
runner.assertValid()
+ runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name':
'errorTest' ])
runner.run()
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
@@ -893,21 +896,57 @@ class PutElasticsearchRecordTest extends
AbstractPutElasticsearchTest<PutElastic
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
+
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "3")
+
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS)[0]
+
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "4")
+
+ assertEquals(1,
+ runner.getProvenanceEvents().stream().filter({
+ e -> ProvenanceEventType.SEND == e.getEventType() &&
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [3 error(s), 4
success(es)]"
+ }).count()
+ )
+
+
+ // failed record output grouped by Elasticsearch _bulk error.type
+ runner.clearTransferState()
+ runner.clearProvenanceEvents()
+
+ runner.setProperty(PutElasticsearchRecord.GROUP_BULK_ERRORS_BY_TYPE,
"true")
+ runner.assertValid()
+ runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name':
'errorTest' ])
+ runner.run()
+
+ runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS,
2)
+
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 1)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
+
+
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
+
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "2")
+
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
+ .getAttribute("elasticsearch.bulk.error"),
containsString("mapper_parsing_exception"))
+
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[1]
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "1")
+
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[1]
+ .getAttribute("elasticsearch.bulk.error"),
containsString("some_other_exception"))
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS)[0]
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "4")
assertEquals(1,
runner.getProvenanceEvents().stream().filter({
- e -> ProvenanceEventType.SEND == e.getEventType() &&
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [1 error(s), 4
success(es)]"
+ e -> ProvenanceEventType.SEND == e.getEventType() &&
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [3 error(s), 4
success(es)]"
}).count()
)
+ // not_found responses treated as failed records
runner.clearTransferState()
runner.clearProvenanceEvents()
runner.setProperty(PutElasticsearchRecord.NOT_FOUND_IS_SUCCESSFUL,
"false")
+ runner.setProperty(PutElasticsearchRecord.GROUP_BULK_ERRORS_BY_TYPE,
"false")
runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name':
'errorTest' ])
runner.assertValid()
runner.run()
@@ -920,23 +959,63 @@ class PutElasticsearchRecordTest extends
AbstractPutElasticsearchTest<PutElastic
runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
-
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "2")
+
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "4")
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS)[0]
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "3")
assertEquals(1,
runner.getProvenanceEvents().stream().filter({
- e -> ProvenanceEventType.SEND == e.getEventType() &&
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [2 error(s), 3
success(es)]"
+ e -> ProvenanceEventType.SEND == e.getEventType() &&
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [4 error(s), 3
success(es)]"
}).count()
)
+ // not_found failed records grouped as an error.type
runner.clearTransferState()
runner.clearProvenanceEvents()
+ runner.setProperty(PutElasticsearchRecord.NOT_FOUND_IS_SUCCESSFUL,
"false")
+ runner.setProperty(PutElasticsearchRecord.GROUP_BULK_ERRORS_BY_TYPE,
"true")
+ runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name':
'errorTest' ])
+ runner.assertValid()
+ runner.run()
+
+ runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS,
3)
+
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 1)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
+
+
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
+
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "2")
+
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
+ .getAttribute("elasticsearch.bulk.error"),
containsString("mapper_parsing_exception"))
+
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[1]
+
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "1")
+
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[1]
+ .getAttribute("elasticsearch.bulk.error"),
containsString("some_other_exception"))
+
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[2]
+
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "1")
+
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[2]
+ .getAttribute("elasticsearch.bulk.error"),
containsString("not_found"))
+
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS)[0]
+
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "3")
+
+ assertEquals(1,
+ runner.getProvenanceEvents().stream().filter({
+ e -> ProvenanceEventType.SEND == e.getEventType() &&
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [4 error(s), 3
success(es)]"
+ }).count()
+ )
+
+
// errors still counted/logged even if not outputting to the error
relationship
+ runner.clearTransferState()
+ runner.clearProvenanceEvents()
+
runner.removeProperty(PutElasticsearchRecord.RESULT_RECORD_WRITER)
runner.setProperty(PutElasticsearchRecord.OUTPUT_ERROR_RESPONSES,
"true")
+ runner.setProperty(PutElasticsearchRecord.GROUP_BULK_ERRORS_BY_TYPE,
"false")
runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name':
'errorTest' ])
runner.assertValid()
runner.run()
@@ -951,10 +1030,12 @@ class PutElasticsearchRecordTest extends
AbstractPutElasticsearchTest<PutElastic
final String errorResponses =
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_ERROR_RESPONSES)[0].getContent()
assertThat(errorResponses, containsString("not_found"))
assertThat(errorResponses, containsString("For input string: 20abc"))
+ assertThat(errorResponses, containsString("For input string:
213,456.9"))
+ assertThat(errorResponses, containsString("For input string: unit
test"))
assertEquals(1,
runner.getProvenanceEvents().stream().filter({
- e -> ProvenanceEventType.SEND == e.getEventType() &&
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [2 error(s), 3
success(es)]"
+ e -> ProvenanceEventType.SEND == e.getEventType() &&
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [4 error(s), 3
success(es)]"
}).count()
)
}
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
index 0996340448..41b36cda61 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy
@@ -126,13 +126,45 @@ class MockBulkLoadClientService extends
AbstractMockElasticsearchClient {
"status" : 400,
"error" : {
"type" : "mapper_parsing_exception",
- "reason" : "failed to parse field [field2] of type [integer] in
document with id '4'",
+ "reason" : "failed to parse field [field2] of type [integer] in
document with id '5'",
"caused_by" : {
"type" : "number_format_exception",
"reason" : "For input string: 20abc"
}
}
}
+ },
+ {
+ "index" : {
+ "_index" : "test",
+ "_type" : "_doc",
+ "_id" : "6",
+ "status" : 400,
+ "error" : {
+ "type" : "mapper_parsing_exception",
+ "reason" : "failed to parse field [field3] of type [geopoint] in
document with id '6'",
+ "caused_by" : {
+ "type" : "number_format_exception",
+ "reason" : "For input string: 213,456.9"
+ }
+ }
+ }
+ },
+ {
+ "index" : {
+ "_index" : "test",
+ "_type" : "_doc",
+ "_id" : "7",
+ "status" : 400,
+ "error" : {
+ "type" : "some_other_exception",
+ "reason" : "failed to index document with id '7' due to some other
reason",
+ "caused_by" : {
+ "type" : "random_exception",
+ "reason" : "For input string: unit test"
+ }
+ }
+ }
}
]
}
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
index 3b23e3c729..9c77306d0c 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
@@ -51,7 +51,7 @@ import static org.apache.http.auth.AuthScope.ANY;
public abstract class AbstractElasticsearchITBase {
// default Elasticsearch version should (ideally) match that in the
nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
protected static final DockerImageName IMAGE = DockerImageName
- .parse(System.getProperty("elasticsearch.docker.image",
"docker.elastic.co/elasticsearch/elasticsearch:8.7.1"));
+ .parse(System.getProperty("elasticsearch.docker.image",
"docker.elastic.co/elasticsearch/elasticsearch:8.8.1"));
protected static final String ELASTIC_USER_PASSWORD =
System.getProperty("elasticsearch.elastic_user.password",
RandomStringUtils.randomAlphanumeric(10, 20));
private static final int PORT = 9200;
protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER =
new ElasticsearchContainer(IMAGE)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
index 571a1fbb3d..1511d4498f 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -101,7 +101,7 @@ language governing permissions and limitations under the
License. -->
</activation>
<properties>
<!-- also update the default Elasticsearch version in
nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
- <elasticsearch_docker_image>8.7.1</elasticsearch_docker_image>
+ <elasticsearch_docker_image>8.8.1</elasticsearch_docker_image>
<elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
</properties>
<build>