NIFI-5769: Refactored FlowController to use Composition over Inheritance
- Ensure that when root group is set, that we register its ID in FlowManager

This closes #3132.

Signed-off-by: Bryan Bende <bbe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/931bb0bc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/931bb0bc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/931bb0bc

Branch: refs/heads/master
Commit: 931bb0bc3b1c0205b260261ce9730af87204e115
Parents: 59e102a
Author: Mark Payne <marka...@hotmail.com>
Authored: Fri Oct 26 10:20:08 2018 -0400
Committer: Bryan Bende <bbe...@apache.org>
Committed: Tue Nov 6 11:23:33 2018 -0500

----------------------------------------------------------------------
 .../nifi/controller/AbstractComponentNode.java  |   45 +-
 .../apache/nifi/controller/ComponentNode.java   |   14 +-
 .../apache/nifi/controller/ProcessorNode.java   |   22 +-
 .../nifi/controller/flow/FlowManager.java       |  290 ++
 .../service/ControllerServiceProvider.java      |   32 +-
 .../nifi/logging/LogRepositoryFactory.java      |    4 +-
 .../nifi/reporting/UserAwareEventAccess.java    |   69 +
 .../validation/TriggerValidationTask.java       |   14 +-
 .../nifi/controller/ExtensionBuilder.java       |  470 +++
 .../apache/nifi/controller/FlowController.java  | 3038 ++----------------
 .../org/apache/nifi/controller/FlowSnippet.java |   47 +
 .../nifi/controller/StandardFlowService.java    |   74 +-
 .../nifi/controller/StandardFlowSnippet.java    |  619 ++++
 .../controller/StandardFlowSynchronizer.java    |  130 +-
 .../nifi/controller/StandardProcessorNode.java  |  102 +-
 .../controller/StandardReloadComponent.java     |  209 ++
 .../controller/flow/StandardFlowManager.java    |  656 ++++
 .../controller/kerberos/KerberosConfig.java     |   45 +
 .../server/StandardLoadBalanceProtocol.java     |    2 +-
 .../reporting/StandardReportingContext.java     |    8 +-
 .../StandardReportingInitializationContext.java |   28 +-
 .../reporting/StandardReportingTaskNode.java    |   13 +-
 .../repository/StandardQueueProvider.java       |   45 +
 .../scheduling/StandardProcessScheduler.java    |   14 +-
 .../serialization/StandardFlowSerializer.java   |    4 +-
 .../service/ControllerServiceLoader.java        |   19 +-
 .../service/GhostControllerService.java         |   82 +
 ...dControllerServiceInitializationContext.java |   20 +-
 .../StandardControllerServiceProvider.java      |  257 +-
 .../controller/state/StandardStateManager.java  |    6 +-
 .../nifi/controller/tasks/ConnectableTask.java  |    8 +-
 .../nifi/controller/tasks/ExpireFlowFiles.java  |    8 +-
 .../nifi/groups/StandardProcessGroup.java       |  141 +-
 .../repository/StandardLogRepository.java       |    6 +-
 .../StandardProcessorInitializationContext.java |   20 +-
 .../provenance/ComponentIdentifierLookup.java   |   71 +
 .../StandardProvenanceAuthorizableFactory.java  |  119 +
 .../nifi/remote/StandardRemoteProcessGroup.java |   81 +-
 .../nifi/reporting/StandardEventAccess.java     |  691 ++++
 .../controller/StandardFlowServiceSpec.groovy   |   10 +-
 .../nifi/controller/TestFlowController.java     |  101 +-
 .../controller/TestStandardProcessorNode.java   |   12 +-
 .../queue/clustered/LoadBalancedQueueIT.java    |    6 +-
 .../server/TestStandardLoadBalanceProtocol.java |    5 +-
 .../reporting/TestStandardReportingContext.java |    2 +-
 .../scheduling/ProcessorLifecycleIT.java        |  277 +-
 .../scheduling/StandardProcessSchedulerIT.java  |  100 -
 .../TestStandardProcessScheduler.java           |  143 +-
 .../StandardFlowSerializerTest.java             |    6 +-
 .../StandardControllerServiceProviderIT.java    |   58 +-
 .../StandardControllerServiceProviderTest.java  |   55 +-
 .../TestStandardControllerServiceProvider.java  |  196 +-
 .../service/mock/MockProcessGroup.java          |    8 +-
 .../StandardExtensionDiscoveringManager.java    |    2 +-
 .../apache/nifi/web/api/VersionsResource.java   |    4 +-
 .../org/apache/nifi/web/api/dto/DtoFactory.java |    3 +-
 .../nifi/web/controller/ControllerFacade.java   |  114 +-
 .../web/controller/ControllerSearchService.java |    2 +-
 .../apache/nifi/web/dao/impl/ComponentDAO.java  |    2 +-
 .../web/dao/impl/StandardConnectionDAO.java     |    8 +-
 .../dao/impl/StandardControllerServiceDAO.java  |   26 +-
 .../nifi/web/dao/impl/StandardFunnelDAO.java    |    8 +-
 .../nifi/web/dao/impl/StandardInputPortDAO.java |   10 +-
 .../nifi/web/dao/impl/StandardLabelDAO.java     |    8 +-
 .../web/dao/impl/StandardOutputPortDAO.java     |   10 +-
 .../web/dao/impl/StandardProcessGroupDAO.java   |   25 +-
 .../nifi/web/dao/impl/StandardProcessorDAO.java |   15 +-
 .../dao/impl/StandardRemoteProcessGroupDAO.java |   12 +-
 .../nifi/web/dao/impl/StandardSnippetDAO.java   |   24 +-
 .../nifi/web/dao/impl/StandardTemplateDAO.java  |   16 +-
 .../ControllerServiceProviderFactoryBean.java   |    2 +-
 .../org/apache/nifi/web/util/SnippetUtils.java  |    4 +-
 .../src/main/resources/nifi-web-api-context.xml |   15 +-
 .../web/dao/impl/StandardTemplateDAOSpec.groovy |   12 +-
 .../impl/TestStandardRemoteProcessGroupDAO.java |    7 +-
 75 files changed, 4837 insertions(+), 3994 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index ab9ece0..f3ae41f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -16,26 +16,6 @@
  */
 package org.apache.nifi.controller;
 
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
 import org.apache.nifi.bundle.Bundle;
@@ -58,6 +38,26 @@ import 
org.apache.nifi.util.file.classloader.ClassLoaderUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
 public abstract class AbstractComponentNode implements ComponentNode {
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractComponentNode.class);
 
@@ -85,17 +85,16 @@ public abstract class AbstractComponentNode implements 
ComponentNode {
     public AbstractComponentNode(final String id,
                                  final ValidationContextFactory 
validationContextFactory, final ControllerServiceProvider serviceProvider,
                                  final String componentType, final String 
componentCanonicalClass, final ComponentVariableRegistry variableRegistry,
-                                 final ReloadComponent reloadComponent, final 
ExtensionManager extensionManager, final ValidationTrigger validationTrigger,
-                                 final boolean isExtensionMissing) {
+                                 final ReloadComponent reloadComponent, final 
ExtensionManager extensionManager, final ValidationTrigger validationTrigger, 
final boolean isExtensionMissing) {
         this.id = id;
         this.validationContextFactory = validationContextFactory;
         this.serviceProvider = serviceProvider;
         this.name = new AtomicReference<>(componentType);
         this.componentType = componentType;
         this.componentCanonicalClass = componentCanonicalClass;
+        this.reloadComponent = reloadComponent;
         this.variableRegistry = variableRegistry;
         this.validationTrigger = validationTrigger;
-        this.reloadComponent = reloadComponent;
         this.extensionManager = extensionManager;
         this.isExtensionMissing = new AtomicBoolean(isExtensionMissing);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
index 707bb75..d0ed572 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
@@ -16,13 +16,6 @@
  */
 package org.apache.nifi.controller;
 
-import java.net.URL;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.nifi.authorization.AccessDeniedException;
 import org.apache.nifi.authorization.AuthorizationResult;
 import org.apache.nifi.authorization.AuthorizationResult.Result;
@@ -39,6 +32,13 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.registry.ComponentVariableRegistry;
 
+import java.net.URL;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 public interface ComponentNode extends ComponentAuthorizable {
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index f3adab0..6e8206e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -16,14 +16,6 @@
  */
 package org.apache.nifi.controller;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.connectable.Connectable;
@@ -40,6 +32,14 @@ import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.scheduling.ExecutionNode;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 public abstract class ProcessorNode extends AbstractComponentNode implements 
Connectable {
 
     protected final AtomicReference<ScheduledState> scheduledState;
@@ -176,6 +176,8 @@ public abstract class ProcessorNode extends 
AbstractComponentNode implements Con
      *            initiate processor <i>start</i> task
      * @param administrativeYieldMillis
      *            the amount of milliseconds to wait for administrative yield
+     * @param timeoutMillis the number of milliseconds to wait after 
triggering the Processor's @OnScheduled methods before timing out and 
considering
+     * the startup a failure. This will result in the thread being interrupted 
and trying again.
      * @param processContext
      *            the instance of {@link ProcessContext}
      * @param schedulingAgentCallback
@@ -186,8 +188,8 @@ public abstract class ProcessorNode extends 
AbstractComponentNode implements Con
      *            value is <code>true</code> or if the Processor is in any 
state other than 'STOPPING' or 'RUNNING', then this method
      *            will throw an {@link IllegalStateException}.
      */
-    public abstract void start(ScheduledExecutorService scheduler,
-        long administrativeYieldMillis, ProcessContext processContext, 
SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping);
+    public abstract void start(ScheduledExecutorService scheduler, long 
administrativeYieldMillis, long timeoutMillis, ProcessContext processContext,
+                               SchedulingAgentCallback 
schedulingAgentCallback, boolean failIfStopping);
 
     /**
      * Will stop the {@link Processor} represented by this {@link 
ProcessorNode}.

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
new file mode 100644
index 0000000..c741f33
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.flow;
+
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+
+import java.net.URL;
+import java.util.Collection;
+import java.util.Set;
+
+public interface FlowManager {
+    String ROOT_GROUP_ID_ALIAS = "root";
+    String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow";
+
+    /**
+     * Creates a Port to use as an Input Port for receiving data via 
Site-to-Site communications
+     *
+     * @param id port id
+     * @param name port name
+     * @return new port
+     * @throws NullPointerException if the ID or name is not unique
+     * @throws IllegalStateException if a Port already exists with the same id.
+     */
+    Port createRemoteInputPort(String id, String name);
+
+    /**
+     * Creates a Port to use as an Output Port for transferring data via 
Site-to-Site communications
+     *
+     * @param id port id
+     * @param name port name
+     * @return new port
+     * @throws NullPointerException if the ID or name is not unique
+     * @throws IllegalStateException if a Port already exists with the same id.
+     */
+    Port createRemoteOutputPort(String id, String name);
+
+    /**
+     * Creates a new Remote Process Group with the given ID that points to the 
given URI
+     *
+     * @param id Remote Process Group ID
+     * @param uris group uris, multiple url can be specified in 
comma-separated format
+     * @return new remote process group
+     * @throws NullPointerException if either argument is null
+     * @throws IllegalArgumentException if any of the <code>uri</code>s is not 
a valid URI.
+     */
+    RemoteProcessGroup createRemoteProcessGroup(String id, String uris);
+
+    /**
+     * @return the ProcessGroup that is currently assigned as the Root Group
+     */
+    ProcessGroup getRootGroup();
+
+    String getRootGroupId();
+
+    /**
+     * Creates an instance of the given snippet and adds the components to the 
given group
+     *
+     * @param group group
+     * @param dto dto
+     *
+     * @throws NullPointerException if either argument is null
+     * @throws IllegalStateException if the snippet is not valid because a
+     * component in the snippet has an ID that is not unique to this flow, or
+     * because it shares an Input Port or Output Port at the root level whose
+     * name already exists in the given ProcessGroup, or because the Template
+     * contains a Processor or a Prioritizer whose class is not valid within
+     * this instance of NiFi.
+     * @throws ProcessorInstantiationException if unable to instantiate a
+     * processor
+     */
+    void instantiateSnippet(ProcessGroup group, FlowSnippetDTO dto) throws 
ProcessorInstantiationException;
+
+    /**
+     * Indicates whether or not the two ID's point to the same ProcessGroup. If
+     * either id is null, will return <code>false</code>.
+     *
+     * @param id1 group id
+     * @param id2 other group id
+     * @return true if same
+     */
+    boolean areGroupsSame(String id1, String id2);
+
+    /**
+     * Creates a new instance of the FlowFilePrioritizer with the given type
+     * @param type the type of the prioritizer (fully qualified class name)
+     * @return the newly created FlowFile Prioritizer
+     */
+    FlowFilePrioritizer createPrioritizer(String type) throws 
InstantiationException, IllegalAccessException, ClassNotFoundException;
+
+    /**
+     * Returns the ProcessGroup with the given ID, or null if no group exists 
with the given ID.
+     * @param id id of the group
+     * @return the ProcessGroup with the given ID or null if none can be found
+     */
+    ProcessGroup getGroup(String id);
+
+    void onProcessGroupAdded(ProcessGroup group);
+
+    void onProcessGroupRemoved(ProcessGroup group);
+
+
+    /**
+     * Finds the Connectable with the given ID, or null if no such Connectable 
exists
+     * @param id the ID of the Connectable
+     * @return the Connectable with the given ID, or null if no such 
Connectable exists
+     */
+    Connectable findConnectable(String id);
+
+    /**
+     * Returns the ProcessorNode with the given ID
+     * @param id the ID of the Processor
+     * @return the ProcessorNode with the given ID or null if no such 
Processor exists
+     */
+    ProcessorNode getProcessorNode(String id);
+
+    void onProcessorAdded(ProcessorNode processor);
+
+    void onProcessorRemoved(ProcessorNode processor);
+
+
+    /**
+     * <p>
+     * Creates a new ProcessorNode with the given type and identifier and
+     * initializes it invoking the methods annotated with {@link 
org.apache.nifi.annotation.lifecycle.OnAdded}.
+     * </p>
+     *
+     * @param type processor type
+     * @param id processor id
+     * @param coordinate the coordinate of the bundle for this processor
+     * @return new processor
+     * @throws NullPointerException if either arg is null
+     */
+    ProcessorNode createProcessor(String type, String id, BundleCoordinate 
coordinate);
+
+    /**
+     * <p>
+     * Creates a new ProcessorNode with the given type and identifier and
+     * optionally initializes it.
+     * </p>
+     *
+     * @param type the fully qualified Processor class name
+     * @param id the unique ID of the Processor
+     * @param coordinate the bundle coordinate for this processor
+     * @param firstTimeAdded whether or not this is the first time this
+     * Processor is added to the graph. If {@code true}, will invoke methods
+     * annotated with the {@link org.apache.nifi.annotation.lifecycle.OnAdded} 
annotation.
+     * @return new processor node
+     * @throws NullPointerException if either arg is null
+     */
+    ProcessorNode createProcessor(String type, String id, BundleCoordinate 
coordinate, boolean firstTimeAdded);
+
+    /**
+     * <p>
+     * Creates a new ProcessorNode with the given type and identifier and
+     * optionally initializes it.
+     * </p>
+     *
+     * @param type the fully qualified Processor class name
+     * @param id the unique ID of the Processor
+     * @param coordinate the bundle coordinate for this processor
+     * @param firstTimeAdded whether or not this is the first time this
+     * Processor is added to the graph. If {@code true}, will invoke methods
+     * annotated with the {@link org.apache.nifi.annotation.lifecycle.OnAdded} 
annotation.
+     * @return new processor node
+     * @throws NullPointerException if either arg is null
+     */
+    ProcessorNode createProcessor(String type, String id, BundleCoordinate 
coordinate, Set<URL> additionalUrls, boolean firstTimeAdded, boolean 
registerLogObserver);
+
+
+
+    Label createLabel(String id, String text);
+
+    Funnel createFunnel(String id);
+
+    Port createLocalInputPort(String id, String name);
+
+    Port createLocalOutputPort(String id, String name);
+
+    ProcessGroup createProcessGroup(String id);
+
+
+
+    void onConnectionAdded(Connection connection);
+
+    void onConnectionRemoved(Connection connection);
+
+    Connection getConnection(String id);
+
+    Set<Connection> findAllConnections();
+
+    /**
+     * Creates a connection between two Connectable objects.
+     *
+     * @param id required ID of the connection
+     * @param name the name of the connection, or <code>null</code> to leave 
the connection unnamed
+     * @param source required source
+     * @param destination required destination
+     * @param relationshipNames required collection of relationship names
+     * @return the created Connection
+     *
+     * @throws NullPointerException if the ID, source, destination, or set of 
relationships is null.
+     * @throws IllegalArgumentException if <code>relationships</code> is an 
empty collection
+     */
+    Connection createConnection(final String id, final String name, final 
Connectable source, final Connectable destination, final Collection<String> 
relationshipNames);
+
+
+
+    void onInputPortAdded(Port inputPort);
+
+    void onInputPortRemoved(Port inputPort);
+
+    Port getInputPort(String id);
+
+
+
+    void onOutputPortAdded(Port outputPort);
+
+    void onOutputPortRemoved(Port outputPort);
+
+    Port getOutputPort(String id);
+
+
+
+    void onFunnelAdded(Funnel funnel);
+
+    void onFunnelRemoved(Funnel funnel);
+
+    Funnel getFunnel(String id);
+
+
+
+    ReportingTaskNode createReportingTask(String type, BundleCoordinate 
bundleCoordinate);
+
+    ReportingTaskNode createReportingTask(String type, BundleCoordinate 
bundleCoordinate, boolean firstTimeAdded);
+
+    ReportingTaskNode createReportingTask(String type, String id, 
BundleCoordinate bundleCoordinate, boolean firstTimeAdded);
+
+    ReportingTaskNode createReportingTask(String type, String id, 
BundleCoordinate bundleCoordinate, Set<URL> additionalUrls, boolean 
firstTimeAdded, boolean register);
+
+    ReportingTaskNode getReportingTaskNode(String taskId);
+
+    void removeReportingTask(ReportingTaskNode reportingTask);
+
+    Set<ReportingTaskNode> getAllReportingTasks();
+
+
+
+    Set<ControllerServiceNode> getAllControllerServices();
+
+    ControllerServiceNode getControllerServiceNode(String id);
+
+    ControllerServiceNode createControllerService(String type, String id, 
BundleCoordinate bundleCoordinate, Set<URL> additionalUrls, boolean 
firstTimeAdded,
+                                                         boolean 
registerLogObserver);
+
+
+    Set<ControllerServiceNode> getRootControllerServices();
+
+    void addRootControllerService(ControllerServiceNode serviceNode);
+
+    ControllerServiceNode getRootControllerService(String serviceIdentifier);
+
+    void removeRootControllerService(final ControllerServiceNode service);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index 95eb6a5..15033b9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -16,37 +16,26 @@
  */
 package org.apache.nifi.controller.service;
 
-import java.net.URL;
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-
-import org.apache.nifi.annotation.lifecycle.OnAdded;
-import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.controller.ComponentNode;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.nar.ExtensionManager;
 
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+
 /**
  *
  */
 public interface ControllerServiceProvider extends ControllerServiceLookup {
 
     /**
-     * Creates a new Controller Service of the specified type and assigns it 
the
-     * given id. If <code>firstTimeadded</code> is true, calls any methods that
-     * are annotated with {@link OnAdded}
-     *
-     * @param type of service
-     * @param id of service
-     * @param bundleCoordinate the coordinate of the bundle for the service
-     * @param additionalUrls optional additional URL resources to add to the 
class loader of the component
-     * @param firstTimeAdded for service
-     * @return the service node
+     * Notifies the ControllerServiceProvider that the given Controller 
Service has been added to the flow
+     * @param serviceNode the Controller Service Node
      */
-    ControllerServiceNode createControllerService(String type, String id, 
BundleCoordinate bundleCoordinate, Set<URL> additionalUrls, boolean 
firstTimeAdded);
+    void onControllerServiceAdded(ControllerServiceNode serviceNode);
 
     /**
      * @param id of the service
@@ -114,10 +103,9 @@ public interface ControllerServiceProvider extends 
ControllerServiceLookup {
     Future<Void> 
disableControllerServicesAsync(Collection<ControllerServiceNode> serviceNodes);
 
     /**
-     * @return a Set of all Controller Services that exist for this service
-     *         provider
+     * @return a Set of all Controller Services that exist for this service 
provider
      */
-    Set<ControllerServiceNode> getAllControllerServices();
+    Collection<ControllerServiceNode> getNonRootControllerServices();
 
     /**
      * Verifies that all running Processors and Reporting Tasks referencing the

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
index 530b6ef..3b3a072 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
@@ -16,12 +16,12 @@
  */
 package org.apache.nifi.logging;
 
-import static java.util.Objects.requireNonNull;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import org.slf4j.LoggerFactory;
+import static java.util.Objects.requireNonNull;
 
 @SuppressWarnings("unchecked")
 public class LogRepositoryFactory {

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/reporting/UserAwareEventAccess.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/reporting/UserAwareEventAccess.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/reporting/UserAwareEventAccess.java
new file mode 100644
index 0000000..32b2e01
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/reporting/UserAwareEventAccess.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.repository.RepositoryStatusReport;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+
+public interface UserAwareEventAccess extends EventAccess {
+    /**
+     * Returns the status for components in the specified group. This request 
is
+     * made by the specified user so the results will be filtered accordingly.
+     *
+     * @param groupId group id
+     * @param user user making request
+     * @return the component status
+     */
+    ProcessGroupStatus getGroupStatus(String groupId, NiFiUser user, int 
recursiveStatusDepth);
+
+
+    /**
+     * Returns the status for the components in the specified group with the
+     * specified report. This request is made by the specified user so the
+     * results will be filtered accordingly.
+     *
+     * @param groupId group id
+     * @param statusReport report
+     * @param user user making request
+     * @return the component status
+     */
+    ProcessGroupStatus getGroupStatus(String groupId, RepositoryStatusReport 
statusReport, NiFiUser user);
+
+    /**
+     * Returns the status for components in the specified group. This request 
is
+     * made by the specified user so the results will be filtered accordingly.
+     *
+     * @param groupId group id
+     * @param user user making request
+     * @return the component status
+     */
+    ProcessGroupStatus getGroupStatus(String groupId, NiFiUser user);
+
+    /**
+     * Returns the status for the components in the specified group with the
+     * specified report. This request is made by the specified user so the
+     * results will be filtered accordingly.
+     *
+     * @param groupId group id
+     * @param statusReport report
+     * @param user user making request
+     * @param recursiveStatusDepth the number of levels deep we should recurse 
and still include the the processors' statuses, the groups' statuses, etc. in 
the returned ProcessGroupStatus
+     * @return the component status
+     */
+    ProcessGroupStatus getGroupStatus(String groupId, RepositoryStatusReport 
statusReport, NiFiUser user, int recursiveStatusDepth);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/TriggerValidationTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/TriggerValidationTask.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/TriggerValidationTask.java
index 0665dd2..b49d52e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/TriggerValidationTask.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/TriggerValidationTask.java
@@ -18,18 +18,18 @@
 package org.apache.nifi.components.validation;
 
 import org.apache.nifi.controller.ComponentNode;
-import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TriggerValidationTask implements Runnable {
     private static final Logger logger = 
LoggerFactory.getLogger(TriggerValidationTask.class);
 
-    private final FlowController controller;
+    private final FlowManager flowManager;
     private final ValidationTrigger validationTrigger;
 
-    public TriggerValidationTask(final FlowController controller, final 
ValidationTrigger validationTrigger) {
-        this.controller = controller;
+    public TriggerValidationTask(final FlowManager flowManager, final 
ValidationTrigger validationTrigger) {
+        this.flowManager = flowManager;
         this.validationTrigger = validationTrigger;
     }
 
@@ -38,15 +38,15 @@ public class TriggerValidationTask implements Runnable {
         try {
             logger.debug("Triggering validation of all components");
 
-            for (final ComponentNode node : 
controller.getAllControllerServices()) {
+            for (final ComponentNode node : 
flowManager.getAllControllerServices()) {
                 validationTrigger.trigger(node);
             }
 
-            for (final ComponentNode node : controller.getAllReportingTasks()) 
{
+            for (final ComponentNode node : 
flowManager.getAllReportingTasks()) {
                 validationTrigger.trigger(node);
             }
 
-            for (final ComponentNode node : 
controller.getRootGroup().findAllProcessors()) {
+            for (final ComponentNode node : 
flowManager.getRootGroup().findAllProcessors()) {
                 validationTrigger.trigger(node);
             }
         } catch (final Throwable t) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
new file mode 100644
index 0000000..e897dd2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationTrigger;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
+import 
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import 
org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
+import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
+import org.apache.nifi.controller.service.ControllerServiceInvocationHandler;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.GhostControllerService;
+import 
org.apache.nifi.controller.service.StandardControllerServiceInitializationContext;
+import 
org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler;
+import org.apache.nifi.controller.service.StandardControllerServiceNode;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.processor.GhostProcessor;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.SimpleProcessLogger;
+import org.apache.nifi.processor.StandardProcessorInitializationContext;
+import org.apache.nifi.processor.StandardValidationContextFactory;
+import org.apache.nifi.registry.ComponentVariableRegistry;
+import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
+import org.apache.nifi.reporting.GhostReportingTask;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.reporting.ReportingTask;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Proxy;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ExtensionBuilder {
+    private static final Logger logger = 
LoggerFactory.getLogger(ExtensionBuilder.class);
+
+    private String type;
+    private String identifier;
+    private BundleCoordinate bundleCoordinate;
+    private ExtensionManager extensionManager;
+    private Set<URL> classpathUrls;
+    private KerberosConfig kerberosConfig = KerberosConfig.NOT_CONFIGURED;
+    private ControllerServiceProvider serviceProvider;
+    private NodeTypeProvider nodeTypeProvider;
+    private VariableRegistry variableRegistry;
+    private ProcessScheduler processScheduler;
+    private ValidationTrigger validationTrigger;
+    private ReloadComponent reloadComponent;
+    private FlowController flowController;
+    private StateManagerProvider stateManagerProvider;
+
+    public ExtensionBuilder type(final String type) {
+        this.type = type;
+        return this;
+    }
+
+    public ExtensionBuilder identifier(final String identifier) {
+        this.identifier = identifier;
+        return this;
+    }
+
+    public ExtensionBuilder bundleCoordinate(final BundleCoordinate 
coordinate) {
+        this.bundleCoordinate = coordinate;
+        return this;
+    }
+
+    public ExtensionBuilder addClasspathUrls(final Set<URL> urls) {
+        if (urls == null || urls.isEmpty()) {
+            return this;
+        }
+
+        if (this.classpathUrls == null) {
+            this.classpathUrls = new HashSet<>();
+        }
+
+        this.classpathUrls.addAll(urls);
+        return this;
+    }
+
+    public ExtensionBuilder kerberosConfig(final KerberosConfig 
kerberosConfig) {
+        this.kerberosConfig = kerberosConfig;
+        return this;
+    }
+
+    public ExtensionBuilder controllerServiceProvider(final 
ControllerServiceProvider serviceProvider) {
+        this.serviceProvider = serviceProvider;
+        return this;
+    }
+
+    public ExtensionBuilder nodeTypeProvider(final NodeTypeProvider 
nodeTypeProvider) {
+        this.nodeTypeProvider = nodeTypeProvider;
+        return this;
+    }
+
+    public ExtensionBuilder variableRegistry(final VariableRegistry 
variableRegistry) {
+        this.variableRegistry = variableRegistry;
+        return this;
+    }
+
+    public ExtensionBuilder processScheduler(final ProcessScheduler scheduler) 
{
+        this.processScheduler = scheduler;
+        return this;
+    }
+
+    public ExtensionBuilder validationTrigger(final ValidationTrigger 
validationTrigger) {
+        this.validationTrigger = validationTrigger;
+        return this;
+    }
+
+    public ExtensionBuilder reloadComponent(final ReloadComponent 
reloadComponent) {
+        this.reloadComponent = reloadComponent;
+        return this;
+    }
+
+    public ExtensionBuilder flowController(final FlowController 
flowController) {
+        this.flowController = flowController;
+        return this;
+    }
+
+    public ExtensionBuilder stateManagerProvider(final StateManagerProvider 
stateManagerProvider) {
+        this.stateManagerProvider = stateManagerProvider;
+        return this;
+    }
+
+    public ExtensionBuilder extensionManager(final ExtensionManager 
extensionManager) {
+        this.extensionManager = extensionManager;
+        return this;
+    }
+
+    public ProcessorNode buildProcessor() {
+        if (identifier == null) {
+            throw new IllegalStateException("Processor ID must be specified");
+        }
+        if (type == null) {
+            throw new IllegalStateException("Processor Type must be 
specified");
+        }
+        if (bundleCoordinate == null) {
+            throw new IllegalStateException("Bundle Coordinate must be 
specified");
+        }
+        if (extensionManager == null) {
+            throw new IllegalStateException("Extension Manager must be 
specified");
+        }
+        if (serviceProvider == null) {
+            throw new IllegalStateException("Controller Service Provider must 
be specified");
+        }
+        if (nodeTypeProvider == null) {
+            throw new IllegalStateException("Node Type Provider must be 
specified");
+        }
+        if (variableRegistry == null) {
+            throw new IllegalStateException("Variable Registry must be 
specified");
+        }
+        if (reloadComponent == null) {
+            throw new IllegalStateException("Reload Component must be 
specified");
+        }
+
+        boolean creationSuccessful = true;
+        LoggableComponent<Processor> loggableComponent;
+        try {
+            loggableComponent = createLoggableProcessor();
+        } catch (final ProcessorInstantiationException pie) {
+            logger.error("Could not create Processor of type " + type + " for 
ID " + identifier + "; creating \"Ghost\" implementation", pie);
+            final GhostProcessor ghostProc = new GhostProcessor();
+            ghostProc.setIdentifier(identifier);
+            ghostProc.setCanonicalClassName(type);
+            loggableComponent = new LoggableComponent<>(ghostProc, 
bundleCoordinate, null);
+            creationSuccessful = false;
+        }
+
+        final ProcessorNode processorNode = 
createProcessorNode(loggableComponent, creationSuccessful);
+        return processorNode;
+    }
+
+    public ReportingTaskNode buildReportingTask() {
+        if (identifier == null) {
+            throw new IllegalStateException("ReportingTask ID must be 
specified");
+        }
+        if (type == null) {
+            throw new IllegalStateException("ReportingTask Type must be 
specified");
+        }
+        if (bundleCoordinate == null) {
+            throw new IllegalStateException("Bundle Coordinate must be 
specified");
+        }
+        if (extensionManager == null) {
+            throw new IllegalStateException("Extension Manager must be 
specified");
+        }
+        if (serviceProvider == null) {
+            throw new IllegalStateException("Controller Service Provider must 
be specified");
+        }
+        if (nodeTypeProvider == null) {
+            throw new IllegalStateException("Node Type Provider must be 
specified");
+        }
+        if (variableRegistry == null) {
+            throw new IllegalStateException("Variable Registry must be 
specified");
+        }
+        if (reloadComponent == null) {
+            throw new IllegalStateException("Reload Component must be 
specified");
+        }
+        if (flowController == null) {
+            throw new IllegalStateException("FlowController must be 
specified");
+        }
+
+        boolean creationSuccessful = true;
+        LoggableComponent<ReportingTask> loggableComponent;
+        try {
+            loggableComponent = createLoggableReportingTask();
+        } catch (final ReportingTaskInstantiationException rtie) {
+            logger.error("Could not create ReportingTask of type " + type + " 
for ID " + identifier + "; creating \"Ghost\" implementation", rtie);
+            final GhostReportingTask ghostReportingTask = new 
GhostReportingTask();
+            ghostReportingTask.setIdentifier(identifier);
+            ghostReportingTask.setCanonicalClassName(type);
+            loggableComponent = new LoggableComponent<>(ghostReportingTask, 
bundleCoordinate, null);
+            creationSuccessful = false;
+        }
+
+        final ReportingTaskNode taskNode = 
createReportingTaskNode(loggableComponent, creationSuccessful);
+        return taskNode;
+    }
+
+    public ControllerServiceNode buildControllerService() {
+        if (identifier == null) {
+            throw new IllegalStateException("ReportingTask ID must be 
specified");
+        }
+        if (type == null) {
+            throw new IllegalStateException("ReportingTask Type must be 
specified");
+        }
+        if (bundleCoordinate == null) {
+            throw new IllegalStateException("Bundle Coordinate must be 
specified");
+        }
+        if (extensionManager == null) {
+            throw new IllegalStateException("Extension Manager must be 
specified");
+        }
+        if (serviceProvider == null) {
+            throw new IllegalStateException("Controller Service Provider must 
be specified");
+        }
+        if (nodeTypeProvider == null) {
+            throw new IllegalStateException("Node Type Provider must be 
specified");
+        }
+        if (variableRegistry == null) {
+            throw new IllegalStateException("Variable Registry must be 
specified");
+        }
+        if (reloadComponent == null) {
+            throw new IllegalStateException("Reload Component must be 
specified");
+        }
+        if (stateManagerProvider == null) {
+            throw new IllegalStateException("State Manager Provider must be 
specified");
+        }
+
+        try {
+            return createControllerServiceNode();
+        } catch (final Exception e) {
+            logger.error("Could not create Controller Service of type " + type 
+ " for ID " + identifier + "; creating \"Ghost\" implementation", e);
+            return createGhostControllerServiceNode();
+        }
+    }
+
+
+    private ProcessorNode createProcessorNode(final 
LoggableComponent<Processor> processor, final boolean creationSuccessful) {
+        final ComponentVariableRegistry componentVarRegistry = new 
StandardComponentVariableRegistry(this.variableRegistry);
+        final ValidationContextFactory validationContextFactory = new 
StandardValidationContextFactory(serviceProvider, componentVarRegistry);
+
+        final ProcessorNode procNode;
+        if (creationSuccessful) {
+            procNode = new StandardProcessorNode(processor, identifier, 
validationContextFactory, processScheduler, serviceProvider,
+                componentVarRegistry, reloadComponent, extensionManager, 
validationTrigger);
+        } else {
+            final String simpleClassName = type.contains(".") ? 
StringUtils.substringAfterLast(type, ".") : type;
+            final String componentType = "(Missing) " + simpleClassName;
+            procNode = new StandardProcessorNode(processor, identifier, 
validationContextFactory, processScheduler, serviceProvider,
+                componentType, type, componentVarRegistry, reloadComponent, 
extensionManager, validationTrigger, true);
+        }
+
+        applyDefaultSettings(procNode);
+        return procNode;
+    }
+
+
+    private ReportingTaskNode createReportingTaskNode(final 
LoggableComponent<ReportingTask> reportingTask, final boolean 
creationSuccessful) {
+        final ComponentVariableRegistry componentVarRegistry = new 
StandardComponentVariableRegistry(this.variableRegistry);
+        final ValidationContextFactory validationContextFactory = new 
StandardValidationContextFactory(serviceProvider, componentVarRegistry);
+        final ReportingTaskNode taskNode;
+        if (creationSuccessful) {
+            taskNode = new StandardReportingTaskNode(reportingTask, 
identifier, flowController, processScheduler,
+                validationContextFactory, componentVarRegistry, 
reloadComponent, extensionManager, validationTrigger);
+            
taskNode.setName(taskNode.getReportingTask().getClass().getSimpleName());
+        } else {
+            final String simpleClassName = type.contains(".") ? 
StringUtils.substringAfterLast(type, ".") : type;
+            final String componentType = "(Missing) " + simpleClassName;
+
+            taskNode = new StandardReportingTaskNode(reportingTask, 
identifier, flowController, processScheduler, validationContextFactory,
+                componentType, type, componentVarRegistry, reloadComponent, 
extensionManager, validationTrigger, true);
+            taskNode.setName(componentType);
+        }
+
+        return taskNode;
+    }
+
+    private void applyDefaultSettings(final ProcessorNode processorNode) {
+        try {
+            final Class<?> procClass = processorNode.getProcessor().getClass();
+
+            final DefaultSettings ds = 
procClass.getAnnotation(DefaultSettings.class);
+            if (ds != null) {
+                processorNode.setYieldPeriod(ds.yieldDuration());
+                processorNode.setPenalizationPeriod(ds.penaltyDuration());
+                processorNode.setBulletinLevel(ds.bulletinLevel());
+            }
+        } catch (final Exception ex) {
+            logger.error("Error while setting default settings from 
DefaultSettings annotation: {}", ex.toString(), ex);
+        }
+    }
+
+    private ControllerServiceNode createControllerServiceNode() throws 
ClassNotFoundException, IllegalAccessException, InstantiationException, 
InitializationException {
+        final ClassLoader ctxClassLoader = 
Thread.currentThread().getContextClassLoader();
+        try {
+            final Bundle bundle = extensionManager.getBundle(bundleCoordinate);
+            if (bundle == null) {
+                throw new IllegalStateException("Unable to find bundle for 
coordinate " + bundleCoordinate.getCoordinate());
+            }
+
+            final ClassLoader detectedClassLoader = 
extensionManager.createInstanceClassLoader(type, identifier, bundle, 
classpathUrls == null ? Collections.emptySet() : classpathUrls);
+            final Class<?> rawClass = Class.forName(type, true, 
detectedClassLoader);
+            Thread.currentThread().setContextClassLoader(detectedClassLoader);
+
+            final Class<? extends ControllerService> controllerServiceClass = 
rawClass.asSubclass(ControllerService.class);
+            final ControllerService serviceImpl = 
controllerServiceClass.newInstance();
+            final StandardControllerServiceInvocationHandler invocationHandler 
= new StandardControllerServiceInvocationHandler(extensionManager, serviceImpl);
+
+            // extract all interfaces... controllerServiceClass is non null so 
getAllInterfaces is non null
+            final List<Class<?>> interfaceList = 
ClassUtils.getAllInterfaces(controllerServiceClass);
+            final Class<?>[] interfaces = interfaceList.toArray(new 
Class<?>[0]);
+
+            final ControllerService proxiedService;
+            if (detectedClassLoader == null) {
+                proxiedService = (ControllerService) 
Proxy.newProxyInstance(getClass().getClassLoader(), interfaces, 
invocationHandler);
+            } else {
+                proxiedService = (ControllerService) 
Proxy.newProxyInstance(detectedClassLoader, interfaces, invocationHandler);
+            }
+
+            logger.info("Created Controller Service of type {} with identifier 
{}", type, identifier);
+            final ComponentLog serviceLogger = new 
SimpleProcessLogger(identifier, serviceImpl);
+            final TerminationAwareLogger terminationAwareLogger = new 
TerminationAwareLogger(serviceLogger);
+
+            final StateManager stateManager = 
stateManagerProvider.getStateManager(identifier);
+            final ControllerServiceInitializationContext initContext = new 
StandardControllerServiceInitializationContext(identifier, 
terminationAwareLogger,
+                serviceProvider, stateManager, kerberosConfig);
+            serviceImpl.initialize(initContext);
+
+            final LoggableComponent<ControllerService> 
originalLoggableComponent = new LoggableComponent<>(serviceImpl, 
bundleCoordinate, terminationAwareLogger);
+            final LoggableComponent<ControllerService> 
proxiedLoggableComponent = new LoggableComponent<>(proxiedService, 
bundleCoordinate, terminationAwareLogger);
+
+            final ComponentVariableRegistry componentVarRegistry = new 
StandardComponentVariableRegistry(this.variableRegistry);
+            final ValidationContextFactory validationContextFactory = new 
StandardValidationContextFactory(serviceProvider, componentVarRegistry);
+            final ControllerServiceNode serviceNode = new 
StandardControllerServiceNode(originalLoggableComponent, 
proxiedLoggableComponent, invocationHandler,
+                identifier, validationContextFactory, serviceProvider, 
componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
+            serviceNode.setName(rawClass.getSimpleName());
+
+            invocationHandler.setServiceNode(serviceNode);
+            return serviceNode;
+        } finally {
+            if (ctxClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(ctxClassLoader);
+            }
+        }
+    }
+
+    private ControllerServiceNode createGhostControllerServiceNode() {
+        final String simpleClassName = type.contains(".") ? 
StringUtils.substringAfterLast(type, ".") : type;
+        final String componentType = "(Missing) " + simpleClassName;
+
+        final GhostControllerService ghostService = new 
GhostControllerService(identifier, type);
+        final LoggableComponent<ControllerService> proxiedLoggableComponent = 
new LoggableComponent<>(ghostService, bundleCoordinate, null);
+
+        final ControllerServiceInvocationHandler invocationHandler = new 
StandardControllerServiceInvocationHandler(extensionManager, ghostService);
+
+        final ComponentVariableRegistry componentVarRegistry = new 
StandardComponentVariableRegistry(this.variableRegistry);
+        final ValidationContextFactory validationContextFactory = new 
StandardValidationContextFactory(serviceProvider, variableRegistry);
+        final ControllerServiceNode serviceNode = new 
StandardControllerServiceNode(proxiedLoggableComponent, 
proxiedLoggableComponent, invocationHandler, identifier,
+            validationContextFactory, serviceProvider, componentType, type, 
componentVarRegistry, reloadComponent, extensionManager, validationTrigger, 
true);
+
+        return serviceNode;
+    }
+
+    private LoggableComponent<Processor> createLoggableProcessor() throws 
ProcessorInstantiationException {
+        try {
+            final LoggableComponent<Processor> processorComponent = 
createLoggableComponent(Processor.class);
+
+            final ProcessorInitializationContext initiContext = new 
StandardProcessorInitializationContext(identifier, 
processorComponent.getLogger(),
+                serviceProvider, nodeTypeProvider, kerberosConfig);
+            processorComponent.getComponent().initialize(initiContext);
+
+            return processorComponent;
+        } catch (final Exception e) {
+            throw new ProcessorInstantiationException(type, e);
+        }
+    }
+
+
+    private LoggableComponent<ReportingTask> createLoggableReportingTask() 
throws ReportingTaskInstantiationException {
+        try {
+            final LoggableComponent<ReportingTask> taskComponent = 
createLoggableComponent(ReportingTask.class);
+
+            final String taskName = 
taskComponent.getComponent().getClass().getSimpleName();
+            final ReportingInitializationContext config = new 
StandardReportingInitializationContext(identifier, taskName,
+                SchedulingStrategy.TIMER_DRIVEN, "1 min", 
taskComponent.getLogger(), serviceProvider, kerberosConfig, nodeTypeProvider);
+
+            taskComponent.getComponent().initialize(config);
+
+            return taskComponent;
+        } catch (final Exception e) {
+            throw new ReportingTaskInstantiationException(type, e);
+        }
+    }
+
+    private <T extends ConfigurableComponent> LoggableComponent<T> 
createLoggableComponent(Class<T> nodeType) throws ClassNotFoundException, 
IllegalAccessException, InstantiationException {
+        final ClassLoader ctxClassLoader = 
Thread.currentThread().getContextClassLoader();
+        try {
+            final Bundle bundle = extensionManager.getBundle(bundleCoordinate);
+            if (bundle == null) {
+                throw new IllegalStateException("Unable to find bundle for 
coordinate " + bundleCoordinate.getCoordinate());
+            }
+
+            final ClassLoader detectedClassLoader = 
extensionManager.createInstanceClassLoader(type, identifier, bundle, 
classpathUrls == null ? Collections.emptySet() : classpathUrls);
+            final Class<?> rawClass = Class.forName(type, true, 
detectedClassLoader);
+            Thread.currentThread().setContextClassLoader(detectedClassLoader);
+
+            final Object extensionInstance = rawClass.newInstance();
+            final ComponentLog componentLog = new 
SimpleProcessLogger(identifier, extensionInstance);
+            final TerminationAwareLogger terminationAwareLogger = new 
TerminationAwareLogger(componentLog);
+
+            final T cast = nodeType.cast(extensionInstance);
+            return new LoggableComponent<>(cast, bundleCoordinate, 
terminationAwareLogger);
+        } finally {
+            if (ctxClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(ctxClassLoader);
+            }
+        }
+    }
+}

Reply via email to