NIFI-250: Fixed bugs with configuring reporting tasks on restart
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/1abee296 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/1abee296 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/1abee296 Branch: refs/heads/NIFI-250 Commit: 1abee2964380bf2aee91189f471035fe3df92919 Parents: 712327f Author: Mark Payne <[email protected]> Authored: Tue Mar 31 10:58:53 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Tue Mar 31 10:58:53 2015 -0400 ---------------------------------------------------------------------- .../controller/StandardFlowSynchronizer.java | 19 ++++++++++++- .../controller/tasks/ReportingTaskWrapper.java | 29 ++++++++++---------- 2 files changed, 33 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1abee296/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 243f7c5..201482c 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -26,6 +26,7 @@ import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -170,10 +171,26 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { controller.setMaxEventDrivenThreadCount(maxThreadCount / 3); } + final Element reportingTasksElement = (Element) DomUtils.getChild(rootElement, "reportingTasks"); + final List<Element> taskElements; + if ( reportingTasksElement == null ) { + taskElements = Collections.emptyList(); + } else { + taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask"); + } + + final Element controllerServicesElement = (Element) DomUtils.getChild(rootElement, "controllerServices"); + final List<Element> controllerServiceElements; + if ( controllerServicesElement == null ) { + controllerServiceElements = Collections.emptyList(); + } else { + controllerServiceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService"); + } + logger.trace("Parsing process group from DOM"); final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0); final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor); - existingFlowEmpty = isEmpty(rootGroupDto); + existingFlowEmpty = taskElements.isEmpty() && controllerServiceElements.isEmpty() && isEmpty(rootGroupDto); logger.debug("Existing Flow Empty = {}", existingFlowEmpty); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1abee296/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java index e115fe7..0c472c8 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java @@ -19,15 +19,13 @@ package org.apache.nifi.controller.tasks; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.scheduling.ScheduleState; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.nar.NarCloseable; +import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.util.ReflectionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class ReportingTaskWrapper implements Runnable { - private static final Logger logger = LoggerFactory.getLogger(ReportingTaskWrapper.class); - private final ReportingTaskNode taskNode; private final ScheduleState scheduleState; @@ -43,20 +41,23 @@ public class ReportingTaskWrapper implements Runnable { try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { taskNode.getReportingTask().onTrigger(taskNode.getReportingContext()); } catch (final Throwable t) { - logger.error("Error running task {} due to {}", taskNode.getReportingTask(), t.toString()); - if (logger.isDebugEnabled()) { - logger.error("", t); + final ComponentLog componentLog = new SimpleProcessLogger(taskNode.getIdentifier(), taskNode.getReportingTask()); + componentLog.error("Error running task {} due to {}", new Object[] {taskNode.getReportingTask(), t.toString()}); + if (componentLog.isDebugEnabled()) { + componentLog.error("", t); } } finally { - // if the processor is no longer scheduled to run and this is the last thread, - // invoke the OnStopped methods - if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { - try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, taskNode.getReportingTask(), taskNode.getConfigurationContext()); + try { + // if the reporting task is no longer scheduled to run and this is the last thread, + // invoke the OnStopped methods + if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { + try (final NarCloseable x = NarCloseable.withNarLoader()) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, taskNode.getReportingTask(), taskNode.getConfigurationContext()); + } } + } finally { + scheduleState.decrementActiveThreadCount(); } - - scheduleState.decrementActiveThreadCount(); } }
