NIFI-250: Fixed bugs with configuring reporting tasks on restart

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/1abee296
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/1abee296
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/1abee296

Branch: refs/heads/NIFI-250
Commit: 1abee2964380bf2aee91189f471035fe3df92919
Parents: 712327f
Author: Mark Payne <[email protected]>
Authored: Tue Mar 31 10:58:53 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Tue Mar 31 10:58:53 2015 -0400

----------------------------------------------------------------------
 .../controller/StandardFlowSynchronizer.java    | 19 ++++++++++++-
 .../controller/tasks/ReportingTaskWrapper.java  | 29 ++++++++++----------
 2 files changed, 33 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1abee296/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 243f7c5..201482c 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -26,6 +26,7 @@ import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -170,10 +171,26 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
                         controller.setMaxEventDrivenThreadCount(maxThreadCount 
/ 3);
                     }
 
+                    final Element reportingTasksElement = (Element) 
DomUtils.getChild(rootElement, "reportingTasks");
+                    final List<Element> taskElements;
+                    if ( reportingTasksElement == null ) {
+                        taskElements = Collections.emptyList();
+                    } else {
+                        taskElements = 
DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask");
+                    }
+                    
+                    final Element controllerServicesElement = (Element) 
DomUtils.getChild(rootElement, "controllerServices");
+                    final List<Element> controllerServiceElements;
+                    if ( controllerServicesElement == null ) {
+                        controllerServiceElements = Collections.emptyList();
+                    } else {
+                        controllerServiceElements = 
DomUtils.getChildElementsByTagName(controllerServicesElement, 
"controllerService");
+                    }
+                    
                     logger.trace("Parsing process group from DOM");
                     final Element rootGroupElement = (Element) 
rootElement.getElementsByTagName("rootGroup").item(0);
                     final ProcessGroupDTO rootGroupDto = 
FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor);
-                    existingFlowEmpty = isEmpty(rootGroupDto);
+                    existingFlowEmpty = taskElements.isEmpty() && 
controllerServiceElements.isEmpty() && isEmpty(rootGroupDto);
                     logger.debug("Existing Flow Empty = {}", 
existingFlowEmpty);
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1abee296/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
index e115fe7..0c472c8 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
@@ -19,15 +19,13 @@ package org.apache.nifi.controller.tasks;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.scheduling.ScheduleState;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.util.ReflectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class ReportingTaskWrapper implements Runnable {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(ReportingTaskWrapper.class);
-
     private final ReportingTaskNode taskNode;
     private final ScheduleState scheduleState;
 
@@ -43,20 +41,23 @@ public class ReportingTaskWrapper implements Runnable {
         try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
             
taskNode.getReportingTask().onTrigger(taskNode.getReportingContext());
         } catch (final Throwable t) {
-            logger.error("Error running task {} due to {}", 
taskNode.getReportingTask(), t.toString());
-            if (logger.isDebugEnabled()) {
-                logger.error("", t);
+            final ComponentLog componentLog = new 
SimpleProcessLogger(taskNode.getIdentifier(), taskNode.getReportingTask());
+            componentLog.error("Error running task {} due to {}", new Object[] 
{taskNode.getReportingTask(), t.toString()});
+            if (componentLog.isDebugEnabled()) {
+                componentLog.error("", t);
             }
         } finally {
-            // if the processor is no longer scheduled to run and this is the 
last thread,
-            // invoke the OnStopped methods
-            if (!scheduleState.isScheduled() && 
scheduleState.getActiveThreadCount() == 1 && 
scheduleState.mustCallOnStoppedMethods()) {
-                try (final NarCloseable x = NarCloseable.withNarLoader()) {
-                    
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, 
org.apache.nifi.processor.annotation.OnStopped.class, 
taskNode.getReportingTask(), taskNode.getConfigurationContext());
+            try {
+                // if the reporting task is no longer scheduled to run and 
this is the last thread,
+                // invoke the OnStopped methods
+                if (!scheduleState.isScheduled() && 
scheduleState.getActiveThreadCount() == 1 && 
scheduleState.mustCallOnStoppedMethods()) {
+                    try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                        
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, 
org.apache.nifi.processor.annotation.OnStopped.class, 
taskNode.getReportingTask(), taskNode.getConfigurationContext());
+                    }
                 }
+            } finally {
+                scheduleState.decrementActiveThreadCount();
             }
-
-            scheduleState.decrementActiveThreadCount();
         }
     }
 

Reply via email to