This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new cd7edb1 NIFI-5172 Adding the ability to specify a record writer for
PutElasticsearchHttpRecord in order to individually handle failed records
cd7edb1 is described below
commit cd7edb1c04fdd977de1fa30d1dbe4bf93c4afda2
Author: Joe Percivall <[email protected]>
AuthorDate: Sun Feb 10 19:47:31 2019 -0500
NIFI-5172 Adding the ability to specify a record writer for
PutElasticsearchHttpRecord in order to individually handle failed records
Addressing PR feedback
Signed-off-by: Matthew Burgess <[email protected]>
This closes #3299
---
.../elasticsearch/PutElasticsearchHttpRecord.java | 129 +++++++++++++++++++--
.../TestPutElasticsearchHttpRecord.java | 106 +++++++++++++++--
2 files changed, 214 insertions(+), 21 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index d431960..87dc5c3 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -55,6 +55,8 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleDateFormatValidator;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
@@ -72,6 +74,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.math.BigInteger;
import java.net.URL;
import java.nio.charset.Charset;
@@ -121,6 +124,31 @@ public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcess
.required(true)
.build();
+ static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
+ .name("put-es-record-record-writer")
+ .displayName("Record Writer")
+ .description("After sending a batch of records, Elasticsearch will
report if individual records failed to insert. As an example, this can happen
if the record doesn't match the mapping" +
+ "for the index it is being inserted into. This property
specifies the Controller Service to use for writing out those individual
records sent to 'failure'. If this is not set, " +
+ "then the whole FlowFile will be routed to failure
(including any records which may have been inserted successfully). Note that
this will only be used if Elasticsearch reports " +
+ "that individual records failed and that in the event that
the entire FlowFile fails (e.g. in the event ES is down), the FF will be routed
to failure without being interpreted " +
+ "by this record writer. If there is an error while
attempting to route the failures, the entire FlowFile will be routed to
Failure. Also if every record failed individually, " +
+ "the entire FlowFile will be routed to Failure without
being parsed by the writer.")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(false)
+ .build();
+
+ static final PropertyDescriptor LOG_ALL_ERRORS = new
PropertyDescriptor.Builder()
+ .name("put-es-record-log-all-errors")
+ .displayName("Log all errors in batch")
+ .description("After sending a batch of records, Elasticsearch will
report if individual records failed to insert. As an example, this can happen
if the record doesn't match the mapping " +
+ "for the index it is being inserted into. If this is set
to true, the processor will log the failure reason for the every failed record.
When set to false only the first error " +
+ "in the batch will be logged.")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .required(false)
+ .defaultValue("false")
+ .allowableValues("true", "false")
+ .build();
+
static final PropertyDescriptor ID_RECORD_PATH = new
PropertyDescriptor.Builder()
.name("put-es-record-id-path")
.displayName("Identifier Record Path")
@@ -222,6 +250,7 @@ public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcess
private volatile String dateFormat;
private volatile String timeFormat;
private volatile String timestampFormat;
+ private volatile Boolean logAllErrors;
static {
final Set<Relationship> _rels = new HashSet<>();
@@ -232,6 +261,8 @@ public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcess
final List<PropertyDescriptor> descriptors = new
ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(RECORD_READER);
+ descriptors.add(RECORD_WRITER);
+ descriptors.add(LOG_ALL_ERRORS);
descriptors.add(ID_RECORD_PATH);
descriptors.add(INDEX);
descriptors.add(TYPE);
@@ -299,6 +330,8 @@ public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcess
if (this.timestampFormat == null) {
this.timestampFormat =
RecordFieldType.TIMESTAMP.getDefaultFormat();
}
+
+ logAllErrors = context.getProperty(LOG_ALL_ERRORS).asBoolean();
}
@Override
@@ -310,6 +343,13 @@ public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcess
}
final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final Optional<RecordSetWriterFactory> writerFactoryOptional;
+
+ if (context.getProperty(RECORD_WRITER).isSet()) {
+ writerFactoryOptional =
Optional.of(context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class));
+ } else {
+ writerFactoryOptional = Optional.empty();
+ }
// Authentication
final String username =
context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
@@ -429,14 +469,14 @@ public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcess
}
final int statusCode = getResponse.code();
+ final Set<Integer> failures = new HashSet<>();
+
if (isSuccess(statusCode)) {
- ResponseBody responseBody = getResponse.body();
- try {
+ try (ResponseBody responseBody = getResponse.body()) {
final byte[] bodyBytes = responseBody.bytes();
JsonNode responseJson = parseJsonResponse(new
ByteArrayInputStream(bodyBytes));
boolean errors = responseJson.get("errors").asBoolean(false);
- int failureCount = 0;
// ES has no rollback, so if errors occur, log them and route
the whole flow file to failure
if (errors) {
ArrayNode itemNodeArray = (ArrayNode)
responseJson.get("items");
@@ -450,7 +490,7 @@ public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcess
JsonNode itemNode = itemNodeArray.get(i);
int status =
itemNode.findPath("status").asInt();
if (!isSuccess(status)) {
- if (errorReason == null) {
+ if (errorReason == null || logAllErrors) {
// Use "result" if it is present; this
happens for status codes like 404 Not Found, which may not have an error/reason
String reason =
itemNode.findPath("result").asText();
if (StringUtils.isEmpty(reason)) {
@@ -458,20 +498,21 @@ public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcess
reason =
itemNode.findPath("reason").asText();
}
errorReason = reason;
- logger.error("Failed to process {} due
to {}, transferring to failure",
- new Object[]{flowFile,
errorReason});
+
+ logger.error("Failed to process record
{} in FlowFile {} due to {}, transferring to failure",
+ new Object[]{i, flowFile,
errorReason});
}
- failureCount++;
+ failures.add(i);
}
}
}
}
- flowFile = session.putAttribute(flowFile, "failure.count",
Integer.toString(failureCount));
- session.transfer(flowFile, REL_FAILURE);
} else {
+ // Everything succeeded, route FF and end
flowFile = session.putAttribute(flowFile, "record.count",
Integer.toString(recordCount));
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile,
url.toString());
+ return;
}
} catch (IOException ioe) {
@@ -479,6 +520,9 @@ public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcess
logger.error("Error parsing Bulk API response: {}", new
Object[]{ioe.getMessage()}, ioe);
session.transfer(flowFile, REL_FAILURE);
context.yield();
+ return;
+ } finally {
+ getResponse.close();
}
} else if (statusCode / 100 == 5) {
// 5xx -> RETRY, but a server error might last a while, so yield
@@ -486,11 +530,76 @@ public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcess
new Object[]{statusCode, getResponse.message()});
session.transfer(flowFile, REL_RETRY);
context.yield();
+ return;
} else { // 1xx, 3xx, 4xx, etc. -> NO RETRY
logger.warn("Elasticsearch returned code {} with message {},
transferring flow file to failure", new Object[]{statusCode,
getResponse.message()});
session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ // If everything failed or we don't have a writer factory, route the
entire original FF to failure.
+ if ((!failures.isEmpty() && failures.size() == recordCount ) ||
!writerFactoryOptional.isPresent()) {
+ flowFile = session.putAttribute(flowFile, "failure.count",
Integer.toString(failures.size()));
+ session.transfer(flowFile, REL_FAILURE);
+
+ } else if (!failures.isEmpty()) {
+ // Some of the records failed and we have a writer, handle the
failures individually.
+ final RecordSetWriterFactory writerFactory =
writerFactoryOptional.get();
+
+ // We know there are a mixture of successes and failures, create
FFs for each and rename input FF to avoid confusion.
+ final FlowFile inputFlowFile = flowFile;
+ final FlowFile successFlowFile = session.create(inputFlowFile);
+ final FlowFile failedFlowFile = session.create(inputFlowFile);
+
+ // Set up the reader and writers
+ try (final OutputStream successOut =
session.write(successFlowFile);
+ final OutputStream failedOut = session.write(failedFlowFile);
+ final InputStream in = session.read(inputFlowFile);
+ final RecordReader reader =
readerFactory.createRecordReader(inputFlowFile, in, getLogger())) {
+
+ final RecordSchema schema =
writerFactory.getSchema(inputFlowFile.getAttributes(), reader.getSchema());
+
+ try (final RecordSetWriter successWriter =
writerFactory.createWriter(getLogger(), schema, successOut);
+ final RecordSetWriter failedWriter =
writerFactory.createWriter(getLogger(), schema, failedOut)) {
+
+ successWriter.beginRecordSet();
+ failedWriter.beginRecordSet();
+
+ // For each record, if it's in the failure set write it to
the failure FF, otherwise it succeeded.
+ Record record;
+ int i = 0;
+ while ((record = reader.nextRecord(false, false)) != null)
{
+ if (failures.contains(i)) {
+ failedWriter.write(record);
+ } else {
+ successWriter.write(record);
+ }
+ i++;
+ }
+ }
+
+ session.putAttribute(successFlowFile, "record.count",
Integer.toString(recordCount - failures.size()));
+
+ // Normal behavior is to output with record.count. In order to
not break backwards compatibility, set both here.
+ session.putAttribute(failedFlowFile, "record.count",
Integer.toString(failures.size()));
+ session.putAttribute(failedFlowFile, "failure.count",
Integer.toString(failures.size()));
+
+ session.transfer(successFlowFile, REL_SUCCESS);
+ session.transfer(failedFlowFile, REL_FAILURE);
+ session.remove(inputFlowFile);
+
+ } catch (final IOException | SchemaNotFoundException |
MalformedRecordException e) {
+ // We failed while handling individual failures. Not much else
we can do other than log, and route the whole thing to failure.
+ getLogger().error("Failed to process {} during individual
record failure handling; route whole FF to failure", new Object[] {flowFile,
e});
+ session.transfer(inputFlowFile, REL_FAILURE);
+ if (successFlowFile != null) {
+ session.remove(successFlowFile);
+ }
+ if (failedFlowFile != null) {
+ session.remove(failedFlowFile);
+ }
+ }
}
- getResponse.close();
}
private void writeRecord(final Record record, final RecordSchema
writeSchema, final JsonGenerator generator)
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index 992e615..9104df9 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -31,6 +31,7 @@ import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -76,21 +77,21 @@ public class TestPutElasticsearchHttpRecord {
assertEquals("20/12/2018 6:55 PM", record.get("ts"));
}, record -> {
assertEquals(2, record.get("id"));
- assertEquals("ræc2", record.get("name"));
+ assertEquals("reç2", record.get("name"));
assertEquals(102, record.get("code"));
assertEquals("20/12/2018", record.get("date"));
assertEquals("6:55 PM", record.get("time"));
assertEquals("20/12/2018 6:55 PM", record.get("ts"));
}, record -> {
assertEquals(3, record.get("id"));
- assertEquals("rèc3", record.get("name"));
+ assertEquals("reç3", record.get("name"));
assertEquals(103, record.get("code"));
assertEquals("20/12/2018", record.get("date"));
assertEquals("6:55 PM", record.get("time"));
assertEquals("20/12/2018 6:55 PM", record.get("ts"));
}, record -> {
assertEquals(4, record.get("id"));
- assertEquals("rëc4", record.get("name"));
+ assertEquals("reç4", record.get("name"));
assertEquals(104, record.get("code"));
assertEquals("20/12/2018", record.get("date"));
assertEquals("6:55 PM", record.get("time"));
@@ -397,11 +398,76 @@ public class TestPutElasticsearchHttpRecord {
assertEquals(ProvenanceEventType.SEND,
provEvents.get(0).getEventType());
}
+ @Test
+ public void testPutElasticsearchOnTriggerFailureWithWriter() throws
IOException {
+ runner = TestRunners.newTestRunner(new
PutElasticsearchHttpRecordTestProcessor(true)); // simulate failures
+ generateTestData(1);
+ generateWriter();
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
"http://127.0.0.1:9200");
+ runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+ runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+
+ runner.enqueue(new byte[0]);
+ runner.run(1, true, true);
+
+ runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 0);
+ runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
+ MockFlowFile flowFileFailure =
runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
+ flowFileFailure.assertAttributeEquals("failure.count", "1");
+ }
+
+ @Test
+ public void
testPutElasticsearchOnTriggerFailureWithWriterMultipleRecords() throws
IOException {
+ runner = TestRunners.newTestRunner(new
PutElasticsearchHttpRecordTestProcessor(2)); // simulate failures
+ generateTestData();
+ generateWriter();
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
"http://127.0.0.1:9200");
+ runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+ runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+
+ runner.enqueue(new byte[0]);
+ runner.run(1, true, true);
+
+ runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+ runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
+ MockFlowFile flowFileSuccess =
runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+ flowFileSuccess.assertAttributeEquals("record.count", "2");
+ MockFlowFile flowFileFailure =
runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
+ flowFileFailure.assertAttributeEquals("record.count", "2");
+ flowFileFailure.assertAttributeEquals("failure.count", "2");
+
+ assertEquals(1, runner.getLogger().getErrorMessages().size());
+ }
+
+ @Test
+ public void
testPutElasticsearchOnTriggerFailureWithWriterMultipleRecordsLogging() throws
IOException {
+ runner = TestRunners.newTestRunner(new
PutElasticsearchHttpRecordTestProcessor(2)); // simulate failures
+ generateTestData();
+ generateWriter();
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
"http://127.0.0.1:9200");
+ runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+ runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+ runner.setProperty(PutElasticsearchHttpRecord.LOG_ALL_ERRORS, "true");
+
+ runner.enqueue(new byte[0]);
+ runner.run(1, true, true);
+
+ runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+ runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
+ MockFlowFile flowFileSuccess =
runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+ flowFileSuccess.assertAttributeEquals("record.count", "2");
+ MockFlowFile flowFileFailure =
runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
+ flowFileFailure.assertAttributeEquals("record.count", "2");
+ flowFileFailure.assertAttributeEquals("failure.count", "2");
+
+ assertEquals(2, runner.getLogger().getErrorMessages().size());
+ }
+
/**
* A Test class that extends the processor in order to inject/mock behavior
*/
private static class PutElasticsearchHttpRecordTestProcessor extends
PutElasticsearchHttpRecord {
- boolean responseHasFailures = false;
+ int numResponseFailures = 0;
OkHttpClient client;
int statusCode = 200;
String statusMessage = "OK";
@@ -409,7 +475,11 @@ public class TestPutElasticsearchHttpRecord {
Consumer<Map>[] recordChecks;
PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
- this.responseHasFailures = responseHasFailures;
+ this.numResponseFailures = responseHasFailures ? 1 : 0;
+ }
+
+ PutElasticsearchHttpRecordTestProcessor(int numResponseFailures) {
+ this.numResponseFailures = numResponseFailures;
}
void setStatus(int code, String message) {
@@ -454,9 +524,9 @@ public class TestPutElasticsearchHttpRecord {
}
}
StringBuilder sb = new StringBuilder("{\"took\": 1,
\"errors\": \"");
- sb.append(responseHasFailures);
+ sb.append(numResponseFailures > 0);
sb.append("\", \"items\": [");
- if (responseHasFailures) {
+ for (int i = 0; i < numResponseFailures; i ++) {
// This case is for a status code of 200 for the bulk
response itself, but with an error (of 400) inside
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed
to parse [gender]\",");
@@ -569,6 +639,10 @@ public class TestPutElasticsearchHttpRecord {
}
private void generateTestData() throws IOException {
+ generateTestData(4);
+ }
+
+ private void generateTestData(int numRecords) throws IOException {
final MockRecordParser parser = new MockRecordParser();
try {
@@ -586,9 +660,19 @@ public class TestPutElasticsearchHttpRecord {
parser.addSchemaField("time", RecordFieldType.TIME);
parser.addSchemaField("ts", RecordFieldType.TIMESTAMP);
- parser.addRecord(1, "reç1", 101, new Date(1545282000000L), new
Time(68150000), new Timestamp(1545332150000L));
- parser.addRecord(2, "ræc2", 102, new Date(1545282000000L), new
Time(68150000), new Timestamp(1545332150000L));
- parser.addRecord(3, "rèc3", 103, new Date(1545282000000L), new
Time(68150000), new Timestamp(1545332150000L));
- parser.addRecord(4, "rëc4", 104, new Date(1545282000000L), new
Time(68150000), new Timestamp(1545332150000L));
+ for(int i=1; i<=numRecords; i++) {
+ parser.addRecord(i, "reç" + i, 100 + i, new Date(1545282000000L),
new Time(68150000), new Timestamp(1545332150000L));
+ }
+ }
+
+ private void generateWriter() throws IOException {
+ final MockRecordWriter writer = new MockRecordWriter();
+ try {
+ runner.addControllerService("writer", writer);
+ } catch (InitializationException e) {
+ throw new IOException(e);
+ }
+ runner.enableControllerService(writer);
+ runner.setProperty(PutElasticsearchHttpRecord.RECORD_WRITER, "writer");
}
}