Repository: incubator-nifi Updated Branches: refs/heads/NIFI-250 50f0c123b -> d122a8363
NIFI-250: Fixed NPE Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d122a836 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d122a836 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d122a836 Branch: refs/heads/NIFI-250 Commit: d122a83633683285cc6fd6daf65f0f533a0da2ca Parents: 50f0c12 Author: Mark Payne <[email protected]> Authored: Fri Mar 13 12:01:00 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Fri Mar 13 12:01:00 2015 -0400 ---------------------------------------------------------------------- .../org/apache/nifi/controller/FlowController.java | 14 ++++++++++++++ .../nifi/controller/StandardFlowSynchronizer.java | 15 +++++++++++++++ 2 files changed, 29 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d122a836/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index fade0e6..a04ae3a 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -79,6 +79,7 @@ import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.label.StandardLabel; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.reporting.ReportingTaskProvider; +import org.apache.nifi.controller.reporting.StandardReportingInitializationContext; import org.apache.nifi.controller.reporting.StandardReportingTaskNode; import org.apache.nifi.controller.repository.ContentRepository; import org.apache.nifi.controller.repository.CounterRepository; @@ -131,6 +132,7 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; import org.apache.nifi.groups.StandardProcessGroup; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.logging.LogRepository; import org.apache.nifi.logging.LogRepositoryFactory; @@ -163,6 +165,8 @@ import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; @@ -2507,6 +2511,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R taskNode.setName(task.getClass().getSimpleName()); if ( firstTimeAdded ) { + final ComponentLog componentLog = new SimpleProcessLogger(id, taskNode.getReportingTask()); + final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(), + SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this); + + try { + task.initialize(config); + } catch (final InitializationException ie) { + throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + type, ie); + } + try (final NarCloseable x = NarCloseable.withNarLoader()) { ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task); } catch (final Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d122a836/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 01ad941..2348dcb 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -53,6 +53,7 @@ import org.apache.nifi.connectable.Size; import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; +import org.apache.nifi.controller.reporting.StandardReportingInitializationContext; import org.apache.nifi.controller.service.ControllerServiceLoader; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceState; @@ -64,10 +65,14 @@ import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.DomUtils; @@ -384,6 +389,16 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } + final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask()); + final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(), + SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller); + + try { + reportingTask.getReportingTask().initialize(config); + } catch (final InitializationException ie) { + throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + dto.getType(), ie); + } + if ( autoResumeState ) { if ( ScheduledState.RUNNING.name().equals(dto.getState()) ) { try {
