mattyb149 commented on a change in pull request #3299: NIFI-5172 Adding the 
ability to specify a record writer for PutElasti…
URL: https://github.com/apache/nifi/pull/3299#discussion_r259927447
 
 

 ##########
 File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
 ##########
 @@ -450,47 +490,117 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
                                 JsonNode itemNode = itemNodeArray.get(i);
                                 int status = 
itemNode.findPath("status").asInt();
                                 if (!isSuccess(status)) {
-                                    if (errorReason == null) {
+                                    if (errorReason == null || logAllErrors) {
                                         // Use "result" if it is present; this 
happens for status codes like 404 Not Found, which may not have an error/reason
                                         String reason = 
itemNode.findPath("result").asText();
                                         if (StringUtils.isEmpty(reason)) {
                                             // If there was no result, we 
expect an error with a string description in the "reason" field
                                             reason = 
itemNode.findPath("reason").asText();
                                         }
                                         errorReason = reason;
-                                        logger.error("Failed to process {} due 
to {}, transferring to failure",
-                                                new Object[]{flowFile, 
errorReason});
+
+                                        logger.error("Failed to process record 
{} in FlowFile {} due to {}, transferring to failure",
+                                                new Object[]{i, flowFile, 
errorReason});
                                     }
-                                    failureCount++;
+                                    failures.add(i);
                                 }
                             }
                         }
                     }
-                    flowFile = session.putAttribute(flowFile, "failure.count", 
Integer.toString(failureCount));
-                    session.transfer(flowFile, REL_FAILURE);
                 } else {
+                    // Everything succeeded, route FF and end
                     flowFile = session.putAttribute(flowFile, "record.count", 
Integer.toString(recordCount));
                     session.transfer(flowFile, REL_SUCCESS);
                     session.getProvenanceReporter().send(flowFile, 
url.toString());
+                    return;
                 }
 
             } catch (IOException ioe) {
                 // Something went wrong when parsing the response, log the 
error and route to failure
                 logger.error("Error parsing Bulk API response: {}", new 
Object[]{ioe.getMessage()}, ioe);
                 session.transfer(flowFile, REL_FAILURE);
                 context.yield();
+                return;
+            } finally {
+                getResponse.close();
             }
         } else if (statusCode / 100 == 5) {
             // 5xx -> RETRY, but a server error might last a while, so yield
             logger.warn("Elasticsearch returned code {} with message {}, 
transferring flow file to retry. This is likely a server problem, yielding...",
                     new Object[]{statusCode, getResponse.message()});
             session.transfer(flowFile, REL_RETRY);
             context.yield();
+            return;
         } else {  // 1xx, 3xx, 4xx, etc. -> NO RETRY
             logger.warn("Elasticsearch returned code {} with message {}, 
transferring flow file to failure", new Object[]{statusCode, 
getResponse.message()});
             session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        // If everything failed or we don't have a writer factory, route the 
entire original FF to failure.
+        if ((!failures.isEmpty() && failures.size() == recordCount ) || 
!writerFactoryOptional.isPresent()) {
+            flowFile = session.putAttribute(flowFile, "failure.count", 
Integer.toString(failures.size()));
+            session.transfer(flowFile, REL_FAILURE);
+
+        } else if (!failures.isEmpty()) {
+            // Some of the records failed and we have a writer, handle the 
failures individually.
+            final RecordSetWriterFactory writerFactory = 
writerFactoryOptional.get();
+
+            // We know there are a mixture of successes and failures, create 
FFs for each and rename input FF to avoid confusion.
+            final FlowFile inputFlowFile = flowFile;
+            final FlowFile successFlowFile = session.create(inputFlowFile);
+            final FlowFile failedFlowFile = session.create(inputFlowFile);
+
+            final OutputStream successOut = session.write(successFlowFile);
 
 Review comment:
   Should these be in the try-with-resources below so they get closed?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to