This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new c45b841392 NIFI-11111 add option to output Elasticsearch error
responses as FlowFile to PutElasticsearchJson and PutElasticsearchRecord
NIFI-11111 clarify error_responses relationships in PutElasticsearchJson/Record
processors NIFI-11111 Refactor exception handling for error response flowfile
transfer NIFI-11111 Add elasticsearch.bulk.error attributes containing the
Elasticsearch _bulk response for error documents in PutElasticsearchJson
c45b841392 is described below
commit c45b841392cc8671fe504374540eb5cca23903ff
Author: Chris Sampson <[email protected]>
AuthorDate: Sun Jan 29 19:32:27 2023 +0000
NIFI-11111 add option to output Elasticsearch error responses as FlowFile
to PutElasticsearchJson and PutElasticsearchRecord
NIFI-11111 clarify error_responses relationships in
PutElasticsearchJson/Record processors
NIFI-11111 Refactor exception handling for error response flowfile transfer
NIFI-11111 Add elasticsearch.bulk.error attributes containing the
Elasticsearch _bulk response for error documents in PutElasticsearchJson
This closes #6903
Signed-off-by: Mike Thomsen <[email protected]>
---
.../elasticsearch/AbstractPutElasticsearch.java | 104 +++++++++++++++++----
.../elasticsearch/PutElasticsearchJson.java | 67 +++++++------
.../elasticsearch/PutElasticsearchRecord.java | 41 ++++----
.../AbstractPutElasticsearchTest.groovy | 50 ++++++++++
.../elasticsearch/PutElasticsearchJsonTest.groovy | 51 ++++++++--
.../PutElasticsearchRecordTest.groovy | 42 +++++++--
.../integration/AbstractElasticsearchITBase.java | 2 +-
nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml | 2 +-
8 files changed, 273 insertions(+), 86 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
index af12d18102..0ab0a504f7 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
@@ -17,7 +17,6 @@
package org.apache.nifi.processors.elasticsearch;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -37,12 +36,17 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
@@ -67,6 +71,16 @@ public abstract class AbstractPutElasticsearch extends
AbstractProcessor impleme
.required(true)
.build();
+ static final PropertyDescriptor OUTPUT_ERROR_RESPONSES = new
PropertyDescriptor.Builder()
+ .name("put-es-output-error-responses")
+ .displayName("Output Error Responses")
+ .description("If this is enabled, response messages from
Elasticsearch marked as \"error\" will be output to the \"error_responses\"
relationship." +
+ "This does not impact the output of flowfiles to the
\"success\" or \"errors\" relationships")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All flowfiles that succeed in being transferred into
Elasticsearch go here. " +
@@ -74,6 +88,12 @@ public abstract class AbstractPutElasticsearch extends
AbstractProcessor impleme
"The Elasticsearch response will need to be examined to
determine whether any Document(s)/Record(s) resulted in errors.")
.build();
+ static final Relationship REL_ERROR_RESPONSES = new Relationship.Builder()
+ .name("error_responses")
+ .description("Elasticsearch _bulk API responses marked as
\"error\" go here " +
+ "(and optionally \"not_found\" when \"Treat \"Not Found\"
as Error\" is \"true\").")
+ .build();
+
static final List<String> ALLOWED_INDEX_OPERATIONS =
Collections.unmodifiableList(Arrays.asList(
IndexOperationRequest.Operation.Create.getValue().toLowerCase(),
IndexOperationRequest.Operation.Delete.getValue().toLowerCase(),
@@ -82,12 +102,22 @@ public abstract class AbstractPutElasticsearch extends
AbstractProcessor impleme
IndexOperationRequest.Operation.Upsert.getValue().toLowerCase()
));
+ private final AtomicReference<Set<Relationship>> relationships = new
AtomicReference<>(getBaseRelationships());
+
boolean logErrors;
+ boolean outputErrorResponses;
boolean notFoundIsSuccessful;
ObjectMapper errorMapper;
final AtomicReference<ElasticSearchClientService> clientService = new
AtomicReference<>(null);
+ abstract Set<Relationship> getBaseRelationships();
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships.get();
+ }
+
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
@@ -99,6 +129,17 @@ public abstract class AbstractPutElasticsearch extends
AbstractProcessor impleme
.build();
}
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
+ if (OUTPUT_ERROR_RESPONSES.equals(descriptor)) {
+ final Set<Relationship> newRelationships = new
HashSet<>(getBaseRelationships());
+ if (Boolean.parseBoolean(newValue)) {
+ newRelationships.add(REL_ERROR_RESPONSES);
+ }
+ relationships.set(newRelationships);
+ }
+ }
+
@Override
public boolean isIndexNotExistSuccessful() {
// index can be created during _bulk index/create operation
@@ -110,8 +151,9 @@ public abstract class AbstractPutElasticsearch extends
AbstractProcessor impleme
clientService.set(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean();
+ this.outputErrorResponses =
context.getProperty(OUTPUT_ERROR_RESPONSES).asBoolean();
- if (errorMapper == null && (logErrors ||
getLogger().isDebugEnabled())) {
+ if (errorMapper == null && (outputErrorResponses || logErrors ||
getLogger().isDebugEnabled())) {
errorMapper = new ObjectMapper();
errorMapper.enable(SerializationFeature.INDENT_OUTPUT);
}
@@ -158,15 +200,35 @@ public abstract class AbstractPutElasticsearch extends
AbstractProcessor impleme
}
}
- void logElasticsearchDocumentErrors(final IndexOperationResponse response)
throws JsonProcessingException {
- if (logErrors || getLogger().isDebugEnabled()) {
- final List<Map<String, Object>> errors = response.getItems();
- final String output = String.format("An error was encountered
while processing bulk operations. Server response below:%n%n%s",
errorMapper.writeValueAsString(errors));
+ void handleElasticsearchDocumentErrors(final Map<Integer, Map<String,
Object>> errors, final ProcessSession session, final FlowFile parent) throws
IOException {
+ if (!errors.isEmpty() && (outputErrorResponses || logErrors ||
getLogger().isDebugEnabled())) {
+ if (logErrors || getLogger().isDebugEnabled()) {
+ final String output = String.format(
+ "An error was encountered while processing bulk
operations. Server response below:%n%n%s",
+ errorMapper.writeValueAsString(errors.values())
+ );
- if (logErrors) {
- getLogger().error(output);
- } else {
- getLogger().debug(output);
+ if (logErrors) {
+ getLogger().error(output);
+ } else {
+ getLogger().debug(output);
+ }
+ }
+
+ if (outputErrorResponses) {
+ FlowFile errorResponsesFF = null;
+ try {
+ errorResponsesFF = session.create(parent);
+ try (final OutputStream errorsOutputStream =
session.write(errorResponsesFF)) {
+ errorMapper.writeValue(errorsOutputStream,
errors.values());
+ }
+ errorResponsesFF = session.putAttribute(errorResponsesFF,
"elasticsearch.put.error.count", String.valueOf(errors.size()));
+ session.transfer(errorResponsesFF, REL_ERROR_RESPONSES);
+ } catch (final IOException ex) {
+ getLogger().error("Unable to write error responses", ex);
+ session.remove(errorResponsesFF);
+ throw ex;
+ }
}
}
}
@@ -179,21 +241,29 @@ public abstract class AbstractPutElasticsearch extends
AbstractProcessor impleme
return inner -> inner.containsKey("result") &&
"not_found".equals(inner.get("result"));
}
- @SafeVarargs
- final List<Integer> findElasticsearchResponseIndices(final
IndexOperationResponse response, final Predicate<Map<String, Object>>...
responseItemFilter) {
- final List<Integer> indices = new ArrayList<>(response.getItems() ==
null ? 0 : response.getItems().size());
- if (response.getItems() != null) {
+ final Map<Integer, Map<String, Object>>
findElasticsearchResponseErrors(final IndexOperationResponse response) {
+ final Map<Integer, Map<String, Object>> errors = new
LinkedHashMap<>(response.getItems() == null ? 0 : response.getItems().size(),
1);
+
+ final List<Predicate<Map<String, Object>>> errorItemFilters = new
ArrayList<>(2);
+ if (response.hasErrors()) {
+ errorItemFilters.add(isElasticsearchError());
+ }
+ if (!notFoundIsSuccessful) {
+ errorItemFilters.add(isElasticsearchNotFound());
+ }
+
+ if (response.getItems() != null && !errorItemFilters.isEmpty()) {
for (int index = 0; index < response.getItems().size(); index++) {
final Map<String, Object> current =
response.getItems().get(index);
if (!current.isEmpty()) {
final String key =
current.keySet().stream().findFirst().orElse(null);
@SuppressWarnings("unchecked") final Map<String, Object>
inner = (Map<String, Object>) current.get(key);
- if (inner != null &&
Arrays.stream(responseItemFilter).anyMatch(p -> p.test(inner))) {
- indices.add(index);
+ if (inner != null && errorItemFilters.stream().anyMatch(p
-> p.test(inner))) {
+ errors.put(index, inner);
}
}
}
}
- return indices;
+ return errors;
}
}
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
index aac7e00d1f..f17f015813 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
@@ -51,14 +51,15 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.Predicate;
import java.util.stream.Collectors;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6",
"elasticsearch7", "elasticsearch8", "put", "index"})
@CapabilityDescription("An Elasticsearch put processor that uses the official
Elastic REST client libraries.")
@WritesAttributes({
- @WritesAttribute(attribute = "elasticsearch.put.error", description =
"The error message provided by Elasticsearch if there is an error indexing the
document.")
+ @WritesAttribute(attribute = "elasticsearch.put.error",
+ description = "The error message if there is an issue parsing
the FlowFile, sending the parsed document to Elasticsearch or parsing the
Elasticsearch response"),
+ @WritesAttribute(attribute = "elasticsearch.bulk.error", description =
"The _bulk response if there was an error during processing the document within
Elasticsearch.")
})
@DynamicProperty(
name = "The name of a URL query parameter to add",
@@ -101,7 +102,8 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
.name("put-es-json-error-documents")
.displayName("Output Error Documents")
.description("If this configuration property is true, the response
from Elasticsearch will be examined for failed documents " +
- "and the FlowFile(s) associated with the failed document(s)
will be sent to the \"" + REL_FAILED_DOCUMENTS.getName() + "\" relationship.")
+ "and the FlowFile(s) associated with the failed document(s)
will be sent to the \"" + REL_FAILED_DOCUMENTS.getName() + "\" relationship " +
+ "with \"elasticsearch.bulk.error\" attributes.")
.allowableValues("true", "false")
.defaultValue("false")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
@@ -110,9 +112,11 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
static final PropertyDescriptor NOT_FOUND_IS_SUCCESSFUL = new
PropertyDescriptor.Builder()
.name("put-es-json-not_found-is-error")
- .displayName("Treat \"Not Found\" as Error")
+ .displayName("Treat \"Not Found\" as Success")
.description("If true, \"not_found\" Elasticsearch Document associated
FlowFiles will be routed to the \"" + REL_SUCCESS.getName() +
- "\" relationship, otherwise to the \"" +
REL_FAILED_DOCUMENTS.getName() + "\" relationship.")
+ "\" relationship, otherwise to the \"" +
REL_FAILED_DOCUMENTS.getName() + "\" relationship. " +
+ "If " + OUTPUT_ERROR_RESPONSES.getDisplayName() + " is
\"true\" then \"not_found\" responses from Elasticsearch " +
+ "will be sent to the " + REL_ERROR_RESPONSES.getName() + "
relationship")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("true")
@@ -121,19 +125,19 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
.build();
static final List<PropertyDescriptor> DESCRIPTORS =
Collections.unmodifiableList(Arrays.asList(
- ID_ATTRIBUTE, INDEX_OP, INDEX, TYPE, BATCH_SIZE, CHARSET,
CLIENT_SERVICE, LOG_ERROR_RESPONSES,
- OUTPUT_ERROR_DOCUMENTS, NOT_FOUND_IS_SUCCESSFUL
+ ID_ATTRIBUTE, INDEX_OP, INDEX, TYPE, BATCH_SIZE, CHARSET,
CLIENT_SERVICE,
+ LOG_ERROR_RESPONSES, OUTPUT_ERROR_RESPONSES, OUTPUT_ERROR_DOCUMENTS,
NOT_FOUND_IS_SUCCESSFUL
));
- static final Set<Relationship> RELATIONSHIPS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
- REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_DOCUMENTS
+ static final Set<Relationship> BASE_RELATIONSHIPS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_DOCUMENTS
)));
private boolean outputErrors;
- private final ObjectMapper inputMapper = new ObjectMapper();
+ private final ObjectMapper objectMapper = new ObjectMapper();
@Override
- public Set<Relationship> getRelationships() {
- return RELATIONSHIPS;
+ Set<Relationship> getBaseRelationships() {
+ return BASE_RELATIONSHIPS;
}
@Override
@@ -141,6 +145,7 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
return DESCRIPTORS;
}
+ @Override
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
@@ -174,7 +179,7 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
try (final InputStream inStream = session.read(input)) {
final byte[] result = IOUtils.toByteArray(inStream);
@SuppressWarnings("unchecked")
- final Map<String, Object> contentMap =
inputMapper.readValue(new String(result, charset), Map.class);
+ final Map<String, Object> contentMap =
objectMapper.readValue(new String(result, charset), Map.class);
final IndexOperationRequest.Operation o =
IndexOperationRequest.Operation.forValue(indexOp);
operations.add(new IndexOperationRequest(index, type, id,
contentMap, o));
@@ -195,7 +200,7 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
if (!originals.isEmpty()) {
try {
- final List<FlowFile> errorDocuments =
indexDocuments(operations, originals, context);
+ final List<FlowFile> errorDocuments =
indexDocuments(operations, originals, context, session);
session.transfer(errorDocuments, REL_FAILED_DOCUMENTS);
errorDocuments.forEach(e ->
session.getProvenanceReporter().send(
@@ -239,26 +244,28 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
}
}
- @SuppressWarnings("unchecked")
- private List<FlowFile> indexDocuments(final List<IndexOperationRequest>
operations, final List<FlowFile> originals, final ProcessContext context)
throws JsonProcessingException {
+ private List<FlowFile> indexDocuments(final List<IndexOperationRequest>
operations, final List<FlowFile> originals, final ProcessContext context, final
ProcessSession session) throws IOException {
final IndexOperationResponse response =
clientService.get().bulk(operations, getUrlQueryParameters(context,
originals.get(0)));
- final List<FlowFile> errorDocuments = new
ArrayList<>(response.getItems() == null ? 0 : response.getItems().size());
-
- List<Predicate<Map<String, Object>>> errorItemFilters = new
ArrayList<>(2);
- if (response.hasErrors()) {
- logElasticsearchDocumentErrors(response);
- if (outputErrors) {
- errorItemFilters.add(isElasticsearchError());
- }
+ final Map<Integer, Map<String, Object>> errors =
findElasticsearchResponseErrors(response);
+ final List<FlowFile> errorDocuments = outputErrors ? new
ArrayList<>(errors.size()) : Collections.emptyList();
+ if (outputErrors) {
+ errors.forEach((index, error) -> {
+ String errorMessage;
+ try {
+ errorMessage = objectMapper.writeValueAsString(error);
+ } catch (JsonProcessingException e) {
+ errorMessage = String.format(
+ "{\"error\": {\"type\":
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
+ e.getMessage().replace("\"", "\\\"")
+ );
+ }
+ errorDocuments.add(session.putAttribute(originals.get(index),
"elasticsearch.bulk.error", errorMessage));
+ });
}
- if (!notFoundIsSuccessful) {
- errorItemFilters.add(isElasticsearchNotFound());
- }
- if (!errorItemFilters.isEmpty()) {
- findElasticsearchResponseIndices(response,
errorItemFilters.toArray(new Predicate[0]))
- .forEach(index ->
errorDocuments.add(originals.get((Integer) index)));
+ if (!errors.isEmpty()) {
+ handleElasticsearchDocumentErrors(errors, session, null);
}
return errorDocuments;
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index d70a615915..13fb557756 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -76,13 +76,13 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Predicate;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6",
"elasticsearch7", "elasticsearch8", "put", "index", "record"})
@CapabilityDescription("A record-aware Elasticsearch put processor that uses
the official Elastic REST client libraries.")
@WritesAttributes({
- @WritesAttribute(attribute = "elasticsearch.put.error", description =
"The error message provided by Elasticsearch if there is an error indexing the
documents."),
+ @WritesAttribute(attribute = "elasticsearch.put.error",
+ description = "The error message if there is an issue parsing
the FlowFile records, sending the parsed documents to Elasticsearch or parsing
the Elasticsearch response."),
@WritesAttribute(attribute = "elasticsearch.put.error.count",
description = "The number of records that generated errors in the Elasticsearch
_bulk API."),
@WritesAttribute(attribute = "elasticsearch.put.success.count",
description = "The number of records that were successfully processed by the
Elasticsearch _bulk API.")
})
@@ -207,7 +207,7 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
.displayName("Result Record Writer")
.description("If this configuration property is set, the response from
Elasticsearch will be examined for failed records " +
"and the failed records will be written to a record set with
this record writer service and sent to the \"" +
- REL_FAILED_RECORDS.getName() + "\" relationship. Successful
records will be written to a record set" +
+ REL_FAILED_RECORDS.getName() + "\" relationship. Successful
records will be written to a record set " +
"with this record writer service and sent to the \"" +
REL_SUCCESSFUL_RECORDS.getName() + "\" relationship.")
.identifiesControllerService(RecordSetWriterFactory.class)
.addValidator(Validator.VALID)
@@ -216,9 +216,11 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
static final PropertyDescriptor NOT_FOUND_IS_SUCCESSFUL = new
PropertyDescriptor.Builder()
.name("put-es-record-not_found-is-error")
- .displayName("Treat \"Not Found\" as Error")
+ .displayName("Treat \"Not Found\" as Success")
.description("If true, \"not_found\" Elasticsearch Document associated
Records will be routed to the \"" +
- REL_SUCCESSFUL_RECORDS.getName() + "\" relationship, otherwise
to the \"" + REL_FAILED_RECORDS.getName() + "\" relationship.")
+ REL_SUCCESSFUL_RECORDS.getName() + "\" relationship, otherwise
to the \"" + REL_FAILED_RECORDS.getName() + "\" relationship. " +
+ "If " + OUTPUT_ERROR_RESPONSES.getDisplayName() + " is
\"true\" then \"not_found\" responses from Elasticsearch " +
+ "will be sent to the " + REL_ERROR_RESPONSES.getName() + "
relationship")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("true")
@@ -266,10 +268,11 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
static final List<PropertyDescriptor> DESCRIPTORS =
Collections.unmodifiableList(Arrays.asList(
INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, CLIENT_SERVICE, RECORD_READER,
BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD,
INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH,
AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD,
- DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES,
RESULT_RECORD_WRITER, NOT_FOUND_IS_SUCCESSFUL
+ DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES,
OUTPUT_ERROR_RESPONSES, RESULT_RECORD_WRITER,
+ NOT_FOUND_IS_SUCCESSFUL
));
- static final Set<Relationship> RELATIONSHIPS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
- REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS,
REL_SUCCESSFUL_RECORDS
+ static final Set<Relationship> BASE_RELATIONSHIPS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS,
REL_SUCCESSFUL_RECORDS
)));
private RecordPathCache recordPathCache;
@@ -281,8 +284,8 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
private volatile String timestampFormat;
@Override
- public Set<Relationship> getRelationships() {
- return RELATIONSHIPS;
+ Set<Relationship> getBaseRelationships() {
+ return BASE_RELATIONSHIPS;
}
@Override
@@ -290,6 +293,7 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
return DESCRIPTORS;
}
+ @Override
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
@@ -460,19 +464,12 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
private ResponseDetails indexDocuments(final BulkOperation bundle, final
ProcessContext context, final ProcessSession session, final FlowFile input)
throws IOException, SchemaNotFoundException {
final IndexOperationResponse response =
clientService.get().bulk(bundle.getOperationList(),
getUrlQueryParameters(context, input));
- List<Predicate<Map<String, Object>>> errorItemFilters = new
ArrayList<>(2);
- if (response.hasErrors()) {
- logElasticsearchDocumentErrors(response);
- errorItemFilters.add(isElasticsearchError());
- }
-
- if (writerFactory != null && !notFoundIsSuccessful) {
- errorItemFilters.add(isElasticsearchNotFound());
+ final Map<Integer, Map<String, Object>> errors =
findElasticsearchResponseErrors(response);
+ if (!errors.isEmpty()) {
+ handleElasticsearchDocumentErrors(errors, session, input);
}
- @SuppressWarnings("unchecked")
- final List<Integer> errorIndices =
findElasticsearchResponseIndices(response, errorItemFilters.toArray(new
Predicate[0]));
- final int numErrors = errorIndices.size();
+ final int numErrors = errors.size();
final int numSuccessful = response.getItems() == null ? 0 :
response.getItems().size() - numErrors;
FlowFile errorFF = null;
FlowFile successFF = null;
@@ -490,7 +487,7 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
errorWriter.beginRecordSet();
successWriter.beginRecordSet();
for (int o = 0; o < bundle.getOriginalRecords().size();
o++) {
- if (errorIndices.contains(o)) {
+ if (errors.containsKey(o)) {
errorWriter.write(bundle.getOriginalRecords().get(o));
} else {
successWriter.write(bundle.getOriginalRecords().get(o));
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearchTest.groovy
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearchTest.groovy
new file mode 100644
index 0000000000..b287d42298
--- /dev/null
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearchTest.groovy
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.jupiter.api.Test
+
+import static org.hamcrest.CoreMatchers.hasItem
+import static org.hamcrest.CoreMatchers.not
+import static org.hamcrest.MatcherAssert.assertThat
+
+abstract class AbstractPutElasticsearchTest<P extends
AbstractPutElasticsearch> {
+ abstract P getProcessor()
+
+ @Test
+ void testOutputErrorResponsesRelationship() {
+ final TestRunner runner = createRunner()
+
+ assertThat(runner.getProcessor().getRelationships(),
not(hasItem(AbstractPutElasticsearch.REL_ERROR_RESPONSES)))
+
+ runner.setProperty(AbstractPutElasticsearch.OUTPUT_ERROR_RESPONSES,
"true")
+ assertThat(runner.getProcessor().getRelationships(),
hasItem(AbstractPutElasticsearch.REL_ERROR_RESPONSES))
+
+ runner.setProperty(AbstractPutElasticsearch.OUTPUT_ERROR_RESPONSES,
"false")
+ assertThat(runner.getProcessor().getRelationships(),
not(hasItem(AbstractPutElasticsearch.REL_ERROR_RESPONSES)))
+ }
+
+ TestRunner createRunner() {
+ final P processor = getProcessor()
+ final TestRunner runner = TestRunners.newTestRunner(processor)
+
+ return runner
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
index 21068df443..e3c371eea8 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
@@ -22,20 +22,18 @@ import org.apache.nifi.elasticsearch.IndexOperationResponse
import org.apache.nifi.processors.elasticsearch.mock.MockBulkLoadClientService
import org.apache.nifi.provenance.ProvenanceEventType
import org.apache.nifi.util.TestRunner
-import org.apache.nifi.util.TestRunners
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import static groovy.json.JsonOutput.prettyPrint
import static groovy.json.JsonOutput.toJson
-
import static org.hamcrest.CoreMatchers.containsString
import static org.hamcrest.MatcherAssert.assertThat
import static org.junit.jupiter.api.Assertions.assertEquals
import static org.junit.jupiter.api.Assertions.assertThrows
import static org.junit.jupiter.api.Assertions.assertTrue
-class PutElasticsearchJsonTest {
+class PutElasticsearchJsonTest extends
AbstractPutElasticsearchTest<PutElasticsearchJson> {
MockBulkLoadClientService clientService
TestRunner runner
@@ -43,10 +41,15 @@ class PutElasticsearchJsonTest {
[ msg: "Hello, world", from: "john.smith" ]
))
+ @Override
+ PutElasticsearchJson getProcessor() {
+ return new PutElasticsearchJson()
+ }
+
@BeforeEach
void setup() {
clientService = new MockBulkLoadClientService()
- runner = TestRunners.newTestRunner(PutElasticsearchJson.class)
+ runner = createRunner()
clientService.response = new IndexOperationResponse(1500)
@@ -60,6 +63,7 @@ class PutElasticsearchJsonTest {
runner.setProperty(PutElasticsearchJson.LOG_ERROR_RESPONSES, "false")
runner.setProperty(PutElasticsearchJson.CLIENT_SERVICE,
"clientService")
runner.setProperty(PutElasticsearchJson.NOT_FOUND_IS_SUCCESSFUL,
"true")
+ runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_RESPONSES,
"false")
runner.enableControllerService(clientService)
runner.assertValid()
@@ -98,6 +102,7 @@ class PutElasticsearchJsonTest {
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, retry)
runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, success)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS,
0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
assertEquals(success,
runner.getProvenanceEvents().stream().filter({
@@ -203,6 +208,7 @@ class PutElasticsearchJsonTest {
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 1)
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS,
0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
}
@Test
@@ -212,6 +218,7 @@ class PutElasticsearchJsonTest {
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS,
0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
}
@Test
@@ -241,7 +248,13 @@ class PutElasticsearchJsonTest {
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS,
1)
-
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0].getContent(),
containsString("20abcd"))
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
+
+ def failedDoc =
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0];
+ assertThat(failedDoc.getContent(), containsString("20abcd"))
+ failedDoc.assertAttributeExists("elasticsearch.bulk.error")
+ failedDoc.assertAttributeNotExists("elasticsearch.put.error")
+ assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"),
containsString("mapper_parsing_exception"))
assertEquals(1,
runner.getProvenanceEvents().stream().filter({
e -> ProvenanceEventType.SEND == e.getEventType() &&
"Elasticsearch _bulk operation error" == e.getDetails()
@@ -253,6 +266,7 @@ class PutElasticsearchJsonTest {
runner.clearProvenanceEvents()
runner.setProperty(PutElasticsearchJson.NOT_FOUND_IS_SUCCESSFUL,
"false")
+ runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_RESPONSES, "true")
for (final def val : values) {
runner.enqueue(prettyPrint(toJson(val)))
@@ -264,8 +278,24 @@ class PutElasticsearchJsonTest {
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS,
2)
-
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0].getContent(),
containsString("not_found"))
-
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[1].getContent(),
containsString("20abcd"))
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
1)
+
+ failedDoc =
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0];
+ assertThat(failedDoc.getContent(), containsString("not_found"))
+ failedDoc.assertAttributeExists("elasticsearch.bulk.error")
+ failedDoc.assertAttributeNotExists("elasticsearch.put.error")
+ assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"),
containsString("not_found"))
+
+ failedDoc =
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[1];
+ assertThat(failedDoc.getContent(), containsString("20abcd"))
+ failedDoc.assertAttributeExists("elasticsearch.bulk.error")
+ failedDoc.assertAttributeNotExists("elasticsearch.put.error")
+ assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"),
containsString("number_format_exception"))
+
+ final String errorResponses =
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_ERROR_RESPONSES)[0].getContent()
+ assertThat(errorResponses, containsString("not_found"))
+ assertThat(errorResponses, containsString("For input string: 20abc"))
+
assertEquals(2,
runner.getProvenanceEvents().stream().filter({
e -> ProvenanceEventType.SEND == e.getEventType() &&
"Elasticsearch _bulk operation error" == e.getDetails()
@@ -285,7 +315,8 @@ class PutElasticsearchJsonTest {
[ id: "1", field1: 'value1', field2: '20' ],
[ id: "2", field1: 'value1', field2: '20' ],
[ id: "2", field1: 'value1', field2: '20' ],
- [ id: "3", field1: 'value1', field2: '20abcd' ]
+ [ id: "3", field1: 'value1', field2: '20abcd' ],
+ [ id: "4", field1: 'value2', field2: '30' ]
]
for (final def val : values) {
@@ -294,10 +325,11 @@ class PutElasticsearchJsonTest {
runner.assertValid()
runner.run()
- runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 4)
+ runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 5)
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS,
0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
}
@Test
@@ -309,6 +341,7 @@ class PutElasticsearchJsonTest {
runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 1)
runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS,
0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILURE)[0].assertAttributeEquals(
"elasticsearch.put.error",
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
index 5e976a31d3..085844be3a 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -32,7 +32,6 @@ import org.apache.nifi.serialization.record.MockSchemaRegistry
import org.apache.nifi.serialization.record.RecordFieldType
import org.apache.nifi.util.StringUtils
import org.apache.nifi.util.TestRunner
-import org.apache.nifi.util.TestRunners
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
@@ -46,9 +45,14 @@ import java.time.format.DateTimeFormatter
import static groovy.json.JsonOutput.prettyPrint
import static groovy.json.JsonOutput.toJson
-import static org.junit.jupiter.api.Assertions.*
-
-class PutElasticsearchRecordTest {
+import static org.hamcrest.CoreMatchers.containsString
+import static org.hamcrest.MatcherAssert.assertThat
+import static org.junit.jupiter.api.Assertions.assertEquals
+import static org.junit.jupiter.api.Assertions.assertNotNull
+import static org.junit.jupiter.api.Assertions.assertThrows
+import static org.junit.jupiter.api.Assertions.assertTrue
+
+class PutElasticsearchRecordTest extends
AbstractPutElasticsearchTest<PutElasticsearchRecord> {
private static final int DATE_YEAR = 2020
private static final int DATE_MONTH = 11
private static final int DATE_DAY = 27
@@ -81,12 +85,17 @@ class PutElasticsearchRecordTest {
static final String flowFileContents =
prettyPrint(toJson(flowFileContentMaps))
+ @Override
+ PutElasticsearchRecord getProcessor() {
+ return new PutElasticsearchRecord()
+ }
+
@BeforeEach
void setup() {
clientService = new MockBulkLoadClientService()
registry = new MockSchemaRegistry()
reader = new JsonTreeReader()
- runner = TestRunners.newTestRunner(PutElasticsearchRecord.class)
+ runner = createRunner()
registry.addSchema("simple", AvroTypeUtil.createSchema(new
Schema.Parser().parse(SCHEMA)))
@@ -141,6 +150,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, success)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS,
0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
if (success > 0) {
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESS).forEach({
ff ->
@@ -319,6 +329,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS,
0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
runner.clearTransferState()
@@ -374,6 +385,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS,
0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
runner.clearTransferState()
@@ -416,6 +428,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS,
0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
runner.clearTransferState()
@@ -455,6 +468,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS,
0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
runner.clearTransferState()
@@ -477,6 +491,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS,
0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
runner.clearTransferState()
@@ -502,6 +517,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS,
0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
runner.clearTransferState()
@@ -528,6 +544,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS,
0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
}
@Test
@@ -604,6 +621,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS,
0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
runner.clearTransferState()
@@ -658,6 +676,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS,
0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
}
@Test
@@ -681,6 +700,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS,
0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
}
@Test
@@ -691,6 +711,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS,
0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
}
@Test
@@ -734,6 +755,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS,
1)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 1)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "1")
@@ -760,6 +782,7 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS,
1)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 1)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
0)
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "2")
@@ -776,7 +799,9 @@ class PutElasticsearchRecordTest {
runner.clearTransferState()
runner.clearProvenanceEvents()
+ // errors still counted/logged even if not outputting to the error
relationship
runner.removeProperty(PutElasticsearchRecord.RESULT_RECORD_WRITER)
+ runner.setProperty(PutElasticsearchRecord.OUTPUT_ERROR_RESPONSES,
"true")
runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name':
'errorTest' ])
runner.assertValid()
runner.run()
@@ -786,10 +811,15 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS,
0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES,
1)
+
+ final String errorResponses =
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_ERROR_RESPONSES)[0].getContent()
+ assertThat(errorResponses, containsString("not_found"))
+ assertThat(errorResponses, containsString("For input string: 20abc"))
assertEquals(1,
runner.getProvenanceEvents().stream().filter({
- e -> ProvenanceEventType.SEND == e.getEventType() &&
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [1 error(s), 4
success(es)]"
+ e -> ProvenanceEventType.SEND == e.getEventType() &&
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [2 error(s), 3
success(es)]"
}).count()
)
}
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
index 7df9de2f07..48610e8261 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
@@ -55,7 +55,7 @@ import static org.apache.http.auth.AuthScope.ANY;
public abstract class AbstractElasticsearchITBase {
// default Elasticsearch version should (ideally) match that in the
nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
protected static final DockerImageName IMAGE = DockerImageName
- .parse(System.getProperty("elasticsearch.docker.image",
"docker.elastic.co/elasticsearch/elasticsearch:8.5.0"));
+ .parse(System.getProperty("elasticsearch.docker.image",
"docker.elastic.co/elasticsearch/elasticsearch:8.7.0"));
protected static final String ELASTIC_USER_PASSWORD =
System.getProperty("elasticsearch.elastic_user.password",
RandomStringUtils.randomAlphanumeric(10, 20));
protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER =
new ElasticsearchContainer(IMAGE)
.withPassword(ELASTIC_USER_PASSWORD)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
index e1c2ec6036..c372e11120 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -87,7 +87,7 @@ language governing permissions and limitations under the
License. -->
</activation>
<properties>
<!-- also update the default Elasticsearch version in
nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
- <elasticsearch_docker_image>8.5.3</elasticsearch_docker_image>
+ <elasticsearch_docker_image>8.7.0</elasticsearch_docker_image>
<elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
</properties>
<build>