This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new cd7edb1  NIFI-5172 Adding the ability to specify a record writer for 
PutElasticsearchHttpRecord in order to individually handle failed records
cd7edb1 is described below

commit cd7edb1c04fdd977de1fa30d1dbe4bf93c4afda2
Author: Joe Percivall <[email protected]>
AuthorDate: Sun Feb 10 19:47:31 2019 -0500

    NIFI-5172 Adding the ability to specify a record writer for 
PutElasticsearchHttpRecord in order to individually handle failed records
    
    Addressing PR feedback
    
    Signed-off-by: Matthew Burgess <[email protected]>
    
    This closes #3299
---
 .../elasticsearch/PutElasticsearchHttpRecord.java  | 129 +++++++++++++++++++--
 .../TestPutElasticsearchHttpRecord.java            | 106 +++++++++++++++--
 2 files changed, 214 insertions(+), 21 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index d431960..87dc5c3 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -55,6 +55,8 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.SimpleDateFormatValidator;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
@@ -72,6 +74,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.math.BigInteger;
 import java.net.URL;
 import java.nio.charset.Charset;
@@ -121,6 +124,31 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
             .required(true)
             .build();
 
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("put-es-record-record-writer")
+            .displayName("Record Writer")
+            .description("After sending a batch of records, Elasticsearch will 
report if individual records failed to insert. As an example, this can happen 
if the record doesn't match the mapping" +
+                    "for the index it is being inserted into. This property 
specifies the Controller Service to use for writing out those individual 
records sent to 'failure'. If this is not set, " +
+                    "then the whole FlowFile will be routed to failure 
(including any records which may have been inserted successfully). Note that 
this will only be used if Elasticsearch reports " +
+                    "that individual records failed and that in the event that 
the entire FlowFile fails (e.g. in the event ES is down), the FF will be routed 
to failure without being interpreted " +
+                    "by this record writer. If there is an error while 
attempting to route the failures, the entire FlowFile will be routed to 
Failure. Also if every record failed individually, " +
+                    "the entire FlowFile will be routed to Failure without 
being parsed by the writer.")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor LOG_ALL_ERRORS = new 
PropertyDescriptor.Builder()
+            .name("put-es-record-log-all-errors")
+            .displayName("Log all errors in batch")
+            .description("After sending a batch of records, Elasticsearch will 
report if individual records failed to insert. As an example, this can happen 
if the record doesn't match the mapping " +
+                    "for the index it is being inserted into. If this is set 
to true, the processor will log the failure reason for the every failed record. 
When set to false only the first error " +
+                    "in the batch will be logged.")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .required(false)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+
     static final PropertyDescriptor ID_RECORD_PATH = new 
PropertyDescriptor.Builder()
             .name("put-es-record-id-path")
             .displayName("Identifier Record Path")
@@ -222,6 +250,7 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
     private volatile String dateFormat;
     private volatile String timeFormat;
     private volatile String timestampFormat;
+    private volatile Boolean logAllErrors;
 
     static {
         final Set<Relationship> _rels = new HashSet<>();
@@ -232,6 +261,8 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
 
         final List<PropertyDescriptor> descriptors = new 
ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
         descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(LOG_ALL_ERRORS);
         descriptors.add(ID_RECORD_PATH);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
@@ -299,6 +330,8 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
         if (this.timestampFormat == null) {
             this.timestampFormat = 
RecordFieldType.TIMESTAMP.getDefaultFormat();
         }
+
+        logAllErrors = context.getProperty(LOG_ALL_ERRORS).asBoolean();
     }
 
     @Override
@@ -310,6 +343,13 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
         }
 
         final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final Optional<RecordSetWriterFactory> writerFactoryOptional;
+
+        if (context.getProperty(RECORD_WRITER).isSet()) {
+            writerFactoryOptional = 
Optional.of(context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class));
+        } else {
+            writerFactoryOptional = Optional.empty();
+        }
 
         // Authentication
         final String username = 
context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
@@ -429,14 +469,14 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
         }
         final int statusCode = getResponse.code();
 
+        final Set<Integer> failures = new HashSet<>();
+
         if (isSuccess(statusCode)) {
-            ResponseBody responseBody = getResponse.body();
-            try {
+            try (ResponseBody responseBody = getResponse.body()) {
                 final byte[] bodyBytes = responseBody.bytes();
 
                 JsonNode responseJson = parseJsonResponse(new 
ByteArrayInputStream(bodyBytes));
                 boolean errors = responseJson.get("errors").asBoolean(false);
-                int failureCount = 0;
                 // ES has no rollback, so if errors occur, log them and route 
the whole flow file to failure
                 if (errors) {
                     ArrayNode itemNodeArray = (ArrayNode) 
responseJson.get("items");
@@ -450,7 +490,7 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
                                 JsonNode itemNode = itemNodeArray.get(i);
                                 int status = 
itemNode.findPath("status").asInt();
                                 if (!isSuccess(status)) {
-                                    if (errorReason == null) {
+                                    if (errorReason == null || logAllErrors) {
                                         // Use "result" if it is present; this 
happens for status codes like 404 Not Found, which may not have an error/reason
                                         String reason = 
itemNode.findPath("result").asText();
                                         if (StringUtils.isEmpty(reason)) {
@@ -458,20 +498,21 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
                                             reason = 
itemNode.findPath("reason").asText();
                                         }
                                         errorReason = reason;
-                                        logger.error("Failed to process {} due 
to {}, transferring to failure",
-                                                new Object[]{flowFile, 
errorReason});
+
+                                        logger.error("Failed to process record 
{} in FlowFile {} due to {}, transferring to failure",
+                                                new Object[]{i, flowFile, 
errorReason});
                                     }
-                                    failureCount++;
+                                    failures.add(i);
                                 }
                             }
                         }
                     }
-                    flowFile = session.putAttribute(flowFile, "failure.count", 
Integer.toString(failureCount));
-                    session.transfer(flowFile, REL_FAILURE);
                 } else {
+                    // Everything succeeded, route FF and end
                     flowFile = session.putAttribute(flowFile, "record.count", 
Integer.toString(recordCount));
                     session.transfer(flowFile, REL_SUCCESS);
                     session.getProvenanceReporter().send(flowFile, 
url.toString());
+                    return;
                 }
 
             } catch (IOException ioe) {
@@ -479,6 +520,9 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
                 logger.error("Error parsing Bulk API response: {}", new 
Object[]{ioe.getMessage()}, ioe);
                 session.transfer(flowFile, REL_FAILURE);
                 context.yield();
+                return;
+            } finally {
+                getResponse.close();
             }
         } else if (statusCode / 100 == 5) {
             // 5xx -> RETRY, but a server error might last a while, so yield
@@ -486,11 +530,76 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
                     new Object[]{statusCode, getResponse.message()});
             session.transfer(flowFile, REL_RETRY);
             context.yield();
+            return;
         } else {  // 1xx, 3xx, 4xx, etc. -> NO RETRY
             logger.warn("Elasticsearch returned code {} with message {}, 
transferring flow file to failure", new Object[]{statusCode, 
getResponse.message()});
             session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        // If everything failed or we don't have a writer factory, route the 
entire original FF to failure.
+        if ((!failures.isEmpty() && failures.size() == recordCount ) || 
!writerFactoryOptional.isPresent()) {
+            flowFile = session.putAttribute(flowFile, "failure.count", 
Integer.toString(failures.size()));
+            session.transfer(flowFile, REL_FAILURE);
+
+        } else if (!failures.isEmpty()) {
+            // Some of the records failed and we have a writer, handle the 
failures individually.
+            final RecordSetWriterFactory writerFactory = 
writerFactoryOptional.get();
+
+            // We know there are a mixture of successes and failures, create 
FFs for each and rename input FF to avoid confusion.
+            final FlowFile inputFlowFile = flowFile;
+            final FlowFile successFlowFile = session.create(inputFlowFile);
+            final FlowFile failedFlowFile = session.create(inputFlowFile);
+
+            // Set up the reader and writers
+            try (final OutputStream successOut = 
session.write(successFlowFile);
+                 final OutputStream failedOut = session.write(failedFlowFile);
+                 final InputStream in = session.read(inputFlowFile);
+                 final RecordReader reader = 
readerFactory.createRecordReader(inputFlowFile, in, getLogger())) {
+
+                final RecordSchema schema = 
writerFactory.getSchema(inputFlowFile.getAttributes(), reader.getSchema());
+
+                try (final RecordSetWriter successWriter = 
writerFactory.createWriter(getLogger(), schema, successOut);
+                     final RecordSetWriter failedWriter = 
writerFactory.createWriter(getLogger(), schema, failedOut)) {
+
+                    successWriter.beginRecordSet();
+                    failedWriter.beginRecordSet();
+
+                    // For each record, if it's in the failure set write it to 
the failure FF, otherwise it succeeded.
+                    Record record;
+                    int i = 0;
+                    while ((record = reader.nextRecord(false, false)) != null) 
{
+                        if (failures.contains(i)) {
+                            failedWriter.write(record);
+                        } else {
+                            successWriter.write(record);
+                        }
+                        i++;
+                    }
+                }
+
+                session.putAttribute(successFlowFile, "record.count", 
Integer.toString(recordCount - failures.size()));
+
+                // Normal behavior is to output with record.count. In order to 
not break backwards compatibility, set both here.
+                session.putAttribute(failedFlowFile, "record.count", 
Integer.toString(failures.size()));
+                session.putAttribute(failedFlowFile, "failure.count", 
Integer.toString(failures.size()));
+
+                session.transfer(successFlowFile, REL_SUCCESS);
+                session.transfer(failedFlowFile, REL_FAILURE);
+                session.remove(inputFlowFile);
+
+            } catch (final IOException | SchemaNotFoundException | 
MalformedRecordException e) {
+                // We failed while handling individual failures. Not much else 
we can do other than log, and route the whole thing to failure.
+                getLogger().error("Failed to process {} during individual 
record failure handling; route whole FF to failure", new Object[] {flowFile, 
e});
+                session.transfer(inputFlowFile, REL_FAILURE);
+                if (successFlowFile != null) {
+                    session.remove(successFlowFile);
+                }
+                if (failedFlowFile != null) {
+                    session.remove(failedFlowFile);
+                }
+            }
         }
-        getResponse.close();
     }
 
     private void writeRecord(final Record record, final RecordSchema 
writeSchema, final JsonGenerator generator)
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index 992e615..9104df9 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -31,6 +31,7 @@ import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -76,21 +77,21 @@ public class TestPutElasticsearchHttpRecord {
             assertEquals("20/12/2018 6:55 PM", record.get("ts"));
         }, record -> {
             assertEquals(2, record.get("id"));
-            assertEquals("ræc2", record.get("name"));
+            assertEquals("reç2", record.get("name"));
             assertEquals(102, record.get("code"));
             assertEquals("20/12/2018", record.get("date"));
             assertEquals("6:55 PM", record.get("time"));
             assertEquals("20/12/2018 6:55 PM", record.get("ts"));
         }, record -> {
             assertEquals(3, record.get("id"));
-            assertEquals("rèc3", record.get("name"));
+            assertEquals("reç3", record.get("name"));
             assertEquals(103, record.get("code"));
             assertEquals("20/12/2018", record.get("date"));
             assertEquals("6:55 PM", record.get("time"));
             assertEquals("20/12/2018 6:55 PM", record.get("ts"));
         }, record -> {
             assertEquals(4, record.get("id"));
-            assertEquals("rëc4", record.get("name"));
+            assertEquals("reç4", record.get("name"));
             assertEquals(104, record.get("code"));
             assertEquals("20/12/2018", record.get("date"));
             assertEquals("6:55 PM", record.get("time"));
@@ -397,11 +398,76 @@ public class TestPutElasticsearchHttpRecord {
         assertEquals(ProvenanceEventType.SEND, 
provEvents.get(0).getEventType());
     }
 
+    @Test
+    public void testPutElasticsearchOnTriggerFailureWithWriter() throws 
IOException {
+        runner = TestRunners.newTestRunner(new 
PutElasticsearchHttpRecordTestProcessor(true)); // simulate failures
+        generateTestData(1);
+        generateWriter();
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 0);
+        runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
+        MockFlowFile flowFileFailure = 
runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
+        flowFileFailure.assertAttributeEquals("failure.count", "1");
+    }
+
+    @Test
+    public void 
testPutElasticsearchOnTriggerFailureWithWriterMultipleRecords() throws 
IOException {
+        runner = TestRunners.newTestRunner(new 
PutElasticsearchHttpRecordTestProcessor(2)); // simulate failures
+        generateTestData();
+        generateWriter();
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
+        MockFlowFile flowFileSuccess = 
runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        flowFileSuccess.assertAttributeEquals("record.count", "2");
+        MockFlowFile flowFileFailure = 
runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
+        flowFileFailure.assertAttributeEquals("record.count", "2");
+        flowFileFailure.assertAttributeEquals("failure.count", "2");
+
+        assertEquals(1, runner.getLogger().getErrorMessages().size());
+    }
+
+    @Test
+    public void 
testPutElasticsearchOnTriggerFailureWithWriterMultipleRecordsLogging() throws 
IOException {
+        runner = TestRunners.newTestRunner(new 
PutElasticsearchHttpRecordTestProcessor(2)); // simulate failures
+        generateTestData();
+        generateWriter();
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
+        runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
+        runner.setProperty(PutElasticsearchHttpRecord.LOG_ALL_ERRORS, "true");
+
+        runner.enqueue(new byte[0]);
+        runner.run(1, true, true);
+
+        runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
+        MockFlowFile flowFileSuccess = 
runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
+        flowFileSuccess.assertAttributeEquals("record.count", "2");
+        MockFlowFile flowFileFailure = 
runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_FAILURE).get(0);
+        flowFileFailure.assertAttributeEquals("record.count", "2");
+        flowFileFailure.assertAttributeEquals("failure.count", "2");
+
+        assertEquals(2, runner.getLogger().getErrorMessages().size());
+    }
+
     /**
      * A Test class that extends the processor in order to inject/mock behavior
      */
     private static class PutElasticsearchHttpRecordTestProcessor extends 
PutElasticsearchHttpRecord {
-        boolean responseHasFailures = false;
+        int numResponseFailures = 0;
         OkHttpClient client;
         int statusCode = 200;
         String statusMessage = "OK";
@@ -409,7 +475,11 @@ public class TestPutElasticsearchHttpRecord {
         Consumer<Map>[] recordChecks;
 
         PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
-            this.responseHasFailures = responseHasFailures;
+            this.numResponseFailures = responseHasFailures ? 1 : 0;
+        }
+
+        PutElasticsearchHttpRecordTestProcessor(int numResponseFailures) {
+            this.numResponseFailures = numResponseFailures;
         }
 
         void setStatus(int code, String message) {
@@ -454,9 +524,9 @@ public class TestPutElasticsearchHttpRecord {
                         }
                     }
                     StringBuilder sb = new StringBuilder("{\"took\": 1, 
\"errors\": \"");
-                    sb.append(responseHasFailures);
+                    sb.append(numResponseFailures > 0);
                     sb.append("\", \"items\": [");
-                    if (responseHasFailures) {
+                    for (int i = 0; i < numResponseFailures; i ++) {
                         // This case is for a status code of 200 for the bulk 
response itself, but with an error (of 400) inside
                         
sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\",");
                         
sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed 
to parse [gender]\",");
@@ -569,6 +639,10 @@ public class TestPutElasticsearchHttpRecord {
     }
 
     private void generateTestData() throws IOException {
+        generateTestData(4);
+    }
+
+    private void generateTestData(int numRecords) throws IOException {
 
         final MockRecordParser parser = new MockRecordParser();
         try {
@@ -586,9 +660,19 @@ public class TestPutElasticsearchHttpRecord {
         parser.addSchemaField("time", RecordFieldType.TIME);
         parser.addSchemaField("ts", RecordFieldType.TIMESTAMP);
 
-        parser.addRecord(1, "reç1", 101, new Date(1545282000000L), new 
Time(68150000), new Timestamp(1545332150000L));
-        parser.addRecord(2, "ræc2", 102, new Date(1545282000000L), new 
Time(68150000), new Timestamp(1545332150000L));
-        parser.addRecord(3, "rèc3", 103, new Date(1545282000000L), new 
Time(68150000), new Timestamp(1545332150000L));
-        parser.addRecord(4, "rëc4", 104, new Date(1545282000000L), new 
Time(68150000), new Timestamp(1545332150000L));
+        for(int i=1; i<=numRecords; i++) {
+            parser.addRecord(i, "reç" + i, 100 + i, new Date(1545282000000L), 
new Time(68150000), new Timestamp(1545332150000L));
+        }
+    }
+
+    private void generateWriter() throws IOException {
+        final MockRecordWriter writer = new MockRecordWriter();
+        try {
+            runner.addControllerService("writer", writer);
+        } catch (InitializationException e) {
+            throw new IOException(e);
+        }
+        runner.enableControllerService(writer);
+        runner.setProperty(PutElasticsearchHttpRecord.RECORD_WRITER, "writer");
     }
 }

Reply via email to