markap14 commented on a change in pull request #4952:
URL: https://github.com/apache/nifi/pull/4952#discussion_r726499496
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java
##########
@@ -46,43 +53,62 @@
private volatile ComponentLog componentLogger;
@Override
- public void addLogMessage(final LogLevel level, final String message) {
- addLogMessage(level, message, (Throwable) null);
- }
-
- @Override
- public void addLogMessage(final LogLevel level, final String message,
final Throwable t) {
- final LogMessage logMessage = new
LogMessage(System.currentTimeMillis(), level, message, t);
+ public void addLogMessage(LogMessage logMessage) {
+ LogLevel logLevel = logMessage.getLogLevel();
- final Collection<LogObserver> logObservers = observers.get(level);
+ final Collection<LogObserver> logObservers = observers.get(logLevel);
if (logObservers != null) {
for (LogObserver observer : logObservers) {
try {
observer.onLogMessage(logMessage);
- } catch (final Throwable observerThrowable) {
+ } catch (final Exception observerThrowable) {
logger.error("Failed to pass log message to Observer {}
due to {}", observer, observerThrowable.toString());
}
}
}
+
}
@Override
public void addLogMessage(final LogLevel level, final String format, final
Object[] params) {
replaceThrowablesWithMessage(params);
+ final Optional<String> flowFileUuid =
getFirstFlowFileUuidFromObjects(params);
final String formattedMessage = MessageFormatter.arrayFormat(format,
params).getMessage();
- addLogMessage(level, formattedMessage);
+ final LogMessage logMessage = new
LogMessage.Builder(System.currentTimeMillis(), level)
+ .message(formattedMessage)
+ .flowFileUuid(flowFileUuid.orElse(null))
+ .createLogMessage();
+ addLogMessage(logMessage);
}
@Override
public void addLogMessage(final LogLevel level, final String format, final
Object[] params, final Throwable t) {
replaceThrowablesWithMessage(params);
+ final Optional<String> flowFileUuid =
getFirstFlowFileUuidFromObjects(params);
final String formattedMessage = MessageFormatter.arrayFormat(format,
params, t).getMessage();
- addLogMessage(level, formattedMessage, t);
+ final LogMessage logMessage = new
LogMessage.Builder(System.currentTimeMillis(), level)
+ .message(formattedMessage)
+ .throwable(t)
+ .flowFileUuid(flowFileUuid.orElse(null))
+ .createLogMessage();
+ addLogMessage(logMessage);
+ }
+
+ private Optional<String> getFirstFlowFileUuidFromObjects(Object[] params) {
+ final List<FlowFile> flowFiles = Arrays.stream(params)
+ .filter(FlowFile.class::isInstance)
+ .map(FlowFile.class::cast)
+ .collect(Collectors.toList());
Review comment:
We need to avoid creation of a `Stream` here. `Stream` creation is very
costly, and this is called for every log message. Should instead just loop over
the array. Perhaps something like:
```
int flowFileCount = 0;
FlowFile flowFileFound = null;
for (final Object param : params) {
if (param instanceof FlowFile) {
if (++flowFileCount > 1) {
return Optional.empty();
}
flowFileFound = (FlowFile) param;
}
}
return Optional.ofNullable(flowFileFound).map(ff ->
ff.getAttribute(CoreAttributes.UUID.key()));
```
##########
File path:
nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
##########
@@ -162,23 +161,21 @@ public void onTrigger(final ReportingContext context) {
final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
getLogger().info("Successfully sent {} Bulletins to destination in
{} ms; Transaction ID = {}; First Event ID = {}",
- new Object[]{bulletins.size(), transferMillis,
transactionId, bulletins.get(0).getId()});
- } catch (final Exception e) {
+ bulletins.size(), transferMillis, transactionId,
bulletins.get(0).getId());
+ } catch (ProcessException e) {
+ throw e;
+ } catch (Exception e) {
if (transaction != null) {
transaction.error();
}
- if (e instanceof ProcessException) {
- throw (ProcessException) e;
- } else {
- throw new ProcessException("Failed to send Bulletins to
destination due to IOException:" + e.getMessage(), e);
- }
+ throw new ProcessException("Failed to send Bulletins to
destination due to IOException:" + e.getMessage(), e);
Review comment:
This logic should not be changed. Previously, when any Exception was
caught, it called `transaction.error()` (assuming transaction != null). With
this change, that doesn't happen when `ProcessException` is thrown, but it
needs to.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]