Lehel44 commented on code in PR #7007:
URL: https://github.com/apache/nifi/pull/7007#discussion_r1131793273
##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java:
##########
@@ -458,23 +461,58 @@ void apply(final ProcessContext context, final
ProcessSession session, final Fun
private ExceptionHandler.OnError<FunctionContext, FlowFile>
onFlowFileError(final ProcessContext context, final ProcessSession session,
final RoutingResult result) {
ExceptionHandler.OnError<FunctionContext, FlowFile> onFlowFileError =
createOnError(context, session, result, REL_FAILURE, REL_RETRY);
- onFlowFileError = onFlowFileError.andThen((c, i, r, e) -> {
- switch (r.destination()) {
+ onFlowFileError = onFlowFileError.andThen((ctx, flowFile,
errorTypesResult, exception) -> {
+ flowFile = addErrorAttributesToFlowFile(session, flowFile,
exception);
+
+ switch (errorTypesResult.destination()) {
case Failure:
- getLogger().error("Failed to update database for {} due to
{}; routing to failure", new Object[] {i, e}, e);
+ getLogger().error("Failed to update database for {} due to
{}; routing to failure", new Object[] {flowFile, exception}, exception);
break;
case Retry:
getLogger().error("Failed to update database for {} due to
{}; it is possible that retrying the operation will succeed, so routing to
retry",
- new Object[] {i, e}, e);
+ new Object[] {flowFile, exception}, exception);
break;
case Self:
- getLogger().error("Failed to update database for {} due to
{};", new Object[] {i, e}, e);
+ getLogger().error("Failed to update database for {} due to
{};", new Object[] {flowFile, exception}, exception);
break;
}
});
return RollbackOnFailure.createOnError(onFlowFileError);
}
+ private ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup>
onGroupError(final ProcessContext context, final ProcessSession session, final
RoutingResult result) {
+ ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup> onGroupError
+ = ExceptionHandler.createOnGroupError(context, session,
result, REL_FAILURE, REL_RETRY);
+ onGroupError = onGroupError.andThen((ctx, flowFileGroup,
errorTypesResult, exception) -> {
+
+ switch (errorTypesResult.destination()) {
+ case Failure:
+ List<FlowFile> flowFilesToFailure =
getFlowFilesOnRelationShip(result, REL_FAILURE);
+ Optional.ofNullable(flowFilesToFailure).map(flowFiles ->
+ result.getRoutedFlowFiles().put(REL_FAILURE,
addErrorAttributesToFlowFilesInGroup(session, flowFiles,
flowFileGroup.getFlowFiles(), exception)));
+ break;
+ case Retry:
+ List<FlowFile> flowFilesToRetry =
getFlowFilesOnRelationShip(result, REL_RETRY);
+ Optional.ofNullable(flowFilesToRetry).map(flowFiles ->
+ result.getRoutedFlowFiles().put(REL_RETRY,
addErrorAttributesToFlowFilesInGroup(session, flowFiles,
flowFileGroup.getFlowFiles(), exception)));
+ break;
+ }
+ });
+ return onGroupError;
+ }
Review Comment:
The RoutingResult::getRoutedFlowFiles already returns an empty map, so the
getFlowFilesOnRelationShip does not need to use Optional and should return an
empty list in case the flowfiles for the specified relationship are missing.
This means that `flowFilesToFailure` variable in onGroupError cannot be null
and also doesn't need to be wrapped in Optionals. I'd also recommend
simplifying the 2-branched switch to an if statement for visibility and
extracting the condition on getting the Relationship based on the
ErrorTypesResult.Destination since both branches depend on that value i.e. ->
the if statement is not needed.
```suggestion
private ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup>
onGroupError(final ProcessContext context, final ProcessSession session, final
RoutingResult result) {
ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup>
onGroupError =
ExceptionHandler.createOnGroupError(context, session,
result, REL_FAILURE, REL_RETRY);
onGroupError = onGroupError.andThen((ctx, flowFileGroup,
errorTypesResult, exception) -> {
Relationship relationship = errorTypesResult.destination() ==
ErrorTypes.Destination.Failure ? REL_FAILURE : REL_RETRY;
List<FlowFile> flowFilesToRelationship =
getFlowFilesOnRelationship(result, relationship);
result.getRoutedFlowFiles().put(relationship,
addErrorAttributesToFlowFilesInGroup(session, flowFilesToRelationship,
flowFileGroup.getFlowFiles(), exception));
});
return onGroupError;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]