markap14 commented on a change in pull request #4952:
URL: https://github.com/apache/nifi/pull/4952#discussion_r726499496



##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java
##########
@@ -46,43 +53,62 @@
     private volatile ComponentLog componentLogger;
 
     @Override
-    public void addLogMessage(final LogLevel level, final String message) {
-        addLogMessage(level, message, (Throwable) null);
-    }
-
-    @Override
-    public void addLogMessage(final LogLevel level, final String message, 
final Throwable t) {
-        final LogMessage logMessage = new 
LogMessage(System.currentTimeMillis(), level, message, t);
+    public void addLogMessage(LogMessage logMessage) {
+        LogLevel logLevel = logMessage.getLogLevel();
 
-        final Collection<LogObserver> logObservers = observers.get(level);
+        final Collection<LogObserver> logObservers = observers.get(logLevel);
         if (logObservers != null) {
             for (LogObserver observer : logObservers) {
                 try {
                     observer.onLogMessage(logMessage);
-                } catch (final Throwable observerThrowable) {
+                } catch (final Exception observerThrowable) {
                     logger.error("Failed to pass log message to Observer {} 
due to {}", observer, observerThrowable.toString());
                 }
             }
         }
+
     }
 
     @Override
     public void addLogMessage(final LogLevel level, final String format, final 
Object[] params) {
         replaceThrowablesWithMessage(params);
+        final Optional<String> flowFileUuid = 
getFirstFlowFileUuidFromObjects(params);
         final String formattedMessage = MessageFormatter.arrayFormat(format, 
params).getMessage();
-        addLogMessage(level, formattedMessage);
+        final LogMessage logMessage = new 
LogMessage.Builder(System.currentTimeMillis(), level)
+                .message(formattedMessage)
+                .flowFileUuid(flowFileUuid.orElse(null))
+                .createLogMessage();
+        addLogMessage(logMessage);
     }
 
     @Override
     public void addLogMessage(final LogLevel level, final String format, final 
Object[] params, final Throwable t) {
         replaceThrowablesWithMessage(params);
+        final Optional<String> flowFileUuid = 
getFirstFlowFileUuidFromObjects(params);
         final String formattedMessage = MessageFormatter.arrayFormat(format, 
params, t).getMessage();
-        addLogMessage(level, formattedMessage, t);
+        final LogMessage logMessage = new 
LogMessage.Builder(System.currentTimeMillis(), level)
+                .message(formattedMessage)
+                .throwable(t)
+                .flowFileUuid(flowFileUuid.orElse(null))
+                .createLogMessage();
+        addLogMessage(logMessage);
+    }
+
+    private Optional<String> getFirstFlowFileUuidFromObjects(Object[] params) {
+        final List<FlowFile> flowFiles = Arrays.stream(params)
+                .filter(FlowFile.class::isInstance)
+                .map(FlowFile.class::cast)
+                .collect(Collectors.toList());

Review comment:
       We need to avoid creation of a `Stream` here. `Stream` creation is very 
costly, and this is called for every log message. Should instead just loop over 
the array. Perhaps something like:
   
   ```
   int flowFileCount = 0;
   FlowFile flowFileFound = null;
   for (final Object param : params) {
       if (param instanceof FlowFile) {
           if (++flowFileCount > 1) {
               return Optional.empty();
           }
           flowFileFound = (FlowFile) param;
       }
   }
   
   return Optional.ofNullable(flowFileFound).map(ff -> 
ff.getAttribute(CoreAttributes.UUID.key()));
   ```

##########
File path: 
nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
##########
@@ -162,23 +161,21 @@ public void onTrigger(final ReportingContext context) {
 
             final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
             getLogger().info("Successfully sent {} Bulletins to destination in 
{} ms; Transaction ID = {}; First Event ID = {}",
-                    new Object[]{bulletins.size(), transferMillis, 
transactionId, bulletins.get(0).getId()});
-        } catch (final Exception e) {
+                    bulletins.size(), transferMillis, transactionId, 
bulletins.get(0).getId());
+        } catch (ProcessException e) {
+            throw e;
+        } catch (Exception e) {
             if (transaction != null) {
                 transaction.error();
             }
-            if (e instanceof ProcessException) {
-                throw (ProcessException) e;
-            } else {
-                throw new ProcessException("Failed to send Bulletins to 
destination due to IOException:" + e.getMessage(), e);
-            }
+            throw new ProcessException("Failed to send Bulletins to 
destination due to IOException:" + e.getMessage(), e);

Review comment:
       This logic should not be changed. Previously, when any Exception was 
caught, it called `transaction.error()` (assuming transaction != null). With 
this change, that doesn't happen when `ProcessException` is thrown, but it 
needs to.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to